- Extract path validation logic into shared helper function in
backend/app/utils/path_utils.py (validate_log_path)
- Refactor AddLogPathRequest to use the helper function
- Apply the same validation to DELETE /api/config/jails/{name}/logpath
endpoint by validating the log_path query parameter
- Return HTTP 422 with descriptive error if validation fails
- Add comprehensive unit tests for path validation
- Update Backend-Development.md with usage examples
This prevents path-traversal attacks on the delete_log_path endpoint
by ensuring all log paths are within allowlisted directories.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
292 lines
11 KiB
Python
292 lines
11 KiB
Python
"""Unit tests for Pydantic models and their validators."""
|
|
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
from pydantic import ValidationError
|
|
|
|
from app.config import Settings
|
|
from app.models.config import AddLogPathRequest
|
|
|
|
|
|
@pytest.fixture
|
|
def _mock_allowed_dirs(monkeypatch: pytest.MonkeyPatch) -> None:
|
|
"""Mock get_settings to return test settings with default allowed directories."""
|
|
def mock_get_settings() -> Settings:
|
|
return Settings(
|
|
database_path=":memory:",
|
|
fail2ban_socket="/tmp/fake.sock",
|
|
fail2ban_config_dir="/tmp/fail2ban",
|
|
session_secret="test-secret-key-do-not-use",
|
|
)
|
|
|
|
monkeypatch.setattr("app.models.config.get_settings", mock_get_settings)
|
|
monkeypatch.setattr("app.utils.path_utils.get_settings", mock_get_settings)
|
|
|
|
|
|
def test_add_log_path_request_valid_in_var_log(_mock_allowed_dirs: None) -> None:
|
|
"""Valid log paths in /var/log are accepted."""
|
|
req = AddLogPathRequest(log_path="/var/log/auth.log", tail=True)
|
|
assert req.log_path == "/var/log/auth.log"
|
|
assert req.tail is True
|
|
|
|
|
|
def test_add_log_path_request_valid_in_config_log(_mock_allowed_dirs: None) -> None:
|
|
"""Valid log paths in /config/log are accepted."""
|
|
req = AddLogPathRequest(log_path="/config/log/app.log", tail=False)
|
|
assert req.log_path == "/config/log/app.log"
|
|
assert req.tail is False
|
|
|
|
|
|
def test_add_log_path_request_valid_with_subdirectory(_mock_allowed_dirs: None) -> None:
|
|
"""Log paths in subdirectories of allowed paths are accepted."""
|
|
req = AddLogPathRequest(log_path="/var/log/syslog/auth.log", tail=True)
|
|
assert req.log_path == "/var/log/syslog/auth.log"
|
|
|
|
|
|
def test_add_log_path_request_rejects_path_outside_allowed(_mock_allowed_dirs: None) -> None:
|
|
"""Paths outside allowed directories are rejected."""
|
|
with pytest.raises(ValidationError) as exc_info:
|
|
AddLogPathRequest(log_path="/etc/passwd", tail=True)
|
|
error_msg = str(exc_info.value)
|
|
assert "outside allowed directories" in error_msg
|
|
assert "/etc/passwd" in error_msg
|
|
|
|
|
|
def test_add_log_path_request_rejects_home_directory(_mock_allowed_dirs: None) -> None:
|
|
"""Paths in home directories are rejected."""
|
|
with pytest.raises(ValidationError) as exc_info:
|
|
AddLogPathRequest(log_path="/home/user/app.log", tail=True)
|
|
error_msg = str(exc_info.value)
|
|
assert "outside allowed directories" in error_msg
|
|
|
|
|
|
def test_add_log_path_request_rejects_shadow_file(_mock_allowed_dirs: None) -> None:
|
|
"""Paths to sensitive files like /etc/shadow are rejected."""
|
|
with pytest.raises(ValidationError) as exc_info:
|
|
AddLogPathRequest(log_path="/etc/shadow", tail=True)
|
|
error_msg = str(exc_info.value)
|
|
assert "outside allowed directories" in error_msg
|
|
|
|
|
|
def test_add_log_path_request_rejects_symlink_escape(monkeypatch: pytest.MonkeyPatch) -> None:
|
|
"""Symlinks that escape allowed directories are rejected."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
allowed_dir = Path(tmpdir) / "allowed"
|
|
escape_dir = Path(tmpdir) / "escape"
|
|
allowed_dir.mkdir()
|
|
escape_dir.mkdir()
|
|
|
|
symlink = allowed_dir / "escape_link"
|
|
symlink.symlink_to(escape_dir)
|
|
|
|
def mock_get_settings() -> Settings:
|
|
return Settings(
|
|
database_path=":memory:",
|
|
fail2ban_socket="/tmp/fake.sock",
|
|
fail2ban_config_dir="/tmp/fail2ban",
|
|
session_secret="test-secret-key-do-not-use",
|
|
allowed_log_dirs=[str(allowed_dir)],
|
|
)
|
|
|
|
monkeypatch.setattr("app.models.config.get_settings", mock_get_settings)
|
|
monkeypatch.setattr("app.utils.path_utils.get_settings", mock_get_settings)
|
|
|
|
with pytest.raises(ValidationError) as exc_info:
|
|
AddLogPathRequest(log_path=str(symlink / "evil.log"), tail=True)
|
|
error_msg = str(exc_info.value)
|
|
assert "outside allowed directories" in error_msg
|
|
|
|
|
|
def test_add_log_path_request_validates_startswith_bypass(_mock_allowed_dirs: None) -> None:
|
|
"""Paths like /var/log_evil that bypass startswith() are rejected."""
|
|
with pytest.raises(ValidationError) as exc_info:
|
|
AddLogPathRequest(log_path="/var/log_evil/somefile.log", tail=True)
|
|
error_msg = str(exc_info.value)
|
|
assert "outside allowed directories" in error_msg
|
|
|
|
|
|
def test_add_log_path_request_default_tail_is_true(_mock_allowed_dirs: None) -> None:
|
|
"""Tail defaults to True."""
|
|
req = AddLogPathRequest(log_path="/var/log/app.log")
|
|
assert req.tail is True
|
|
|
|
|
|
def test_add_log_path_request_error_message_lists_allowed_dirs(_mock_allowed_dirs: None) -> None:
|
|
"""Error message includes the list of allowed directories."""
|
|
with pytest.raises(ValidationError) as exc_info:
|
|
AddLogPathRequest(log_path="/root/secret.log", tail=True)
|
|
error_msg = str(exc_info.value)
|
|
assert "/var/log" in error_msg
|
|
assert "/config/log" in error_msg
|
|
|
|
|
|
def test_add_log_path_request_custom_allowed_dirs(monkeypatch: pytest.MonkeyPatch) -> None:
|
|
"""Custom allowed directories from settings are respected."""
|
|
def mock_get_settings() -> Settings:
|
|
return Settings(
|
|
database_path=":memory:",
|
|
fail2ban_socket="/tmp/fake.sock",
|
|
fail2ban_config_dir="/tmp/fail2ban",
|
|
session_secret="test-secret-key-do-not-use",
|
|
allowed_log_dirs=["/custom/logs", "/another/path"],
|
|
)
|
|
|
|
monkeypatch.setattr("app.models.config.get_settings", mock_get_settings)
|
|
monkeypatch.setattr("app.utils.path_utils.get_settings", mock_get_settings)
|
|
|
|
req = AddLogPathRequest(log_path="/custom/logs/app.log", tail=True)
|
|
assert req.log_path == "/custom/logs/app.log"
|
|
|
|
with pytest.raises(ValidationError):
|
|
AddLogPathRequest(log_path="/var/log/app.log", tail=True)
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GlobalConfigUpdate and GlobalConfigResponse
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_global_config_update_valid_log_level(_mock_allowed_dirs: None) -> None:
|
|
"""GlobalConfigUpdate accepts valid log levels."""
|
|
from app.models.config import GlobalConfigUpdate
|
|
|
|
for level in ["CRITICAL", "ERROR", "WARNING", "NOTICE", "INFO", "DEBUG"]:
|
|
update = GlobalConfigUpdate(log_level=level)
|
|
assert update.log_level == level
|
|
|
|
|
|
def test_global_config_update_invalid_log_level(_mock_allowed_dirs: None) -> None:
|
|
"""GlobalConfigUpdate rejects invalid log levels."""
|
|
from app.models.config import GlobalConfigUpdate
|
|
|
|
with pytest.raises(ValidationError) as exc_info:
|
|
GlobalConfigUpdate(log_level="invalid")
|
|
error_msg = str(exc_info.value)
|
|
assert "CRITICAL" in error_msg
|
|
|
|
|
|
def test_global_config_update_log_level_case_sensitive(_mock_allowed_dirs: None) -> None:
|
|
"""GlobalConfigUpdate log_level is case-sensitive (must be uppercase)."""
|
|
from app.models.config import GlobalConfigUpdate
|
|
|
|
with pytest.raises(ValidationError):
|
|
GlobalConfigUpdate(log_level="debug")
|
|
|
|
with pytest.raises(ValidationError):
|
|
GlobalConfigUpdate(log_level="Debug")
|
|
|
|
|
|
def test_global_config_update_valid_log_target_special(_mock_allowed_dirs: None) -> None:
|
|
"""GlobalConfigUpdate accepts special log target values."""
|
|
from app.models.config import GlobalConfigUpdate
|
|
|
|
for target in ["STDOUT", "STDERR", "SYSLOG"]:
|
|
update = GlobalConfigUpdate(log_target=target)
|
|
assert update.log_target == target
|
|
|
|
|
|
def test_global_config_update_valid_log_target_path(_mock_allowed_dirs: None) -> None:
|
|
"""GlobalConfigUpdate accepts log targets that are valid file paths."""
|
|
from app.models.config import GlobalConfigUpdate
|
|
|
|
update = GlobalConfigUpdate(log_target="/var/log/fail2ban.log")
|
|
assert update.log_target == "/var/log/fail2ban.log"
|
|
|
|
update = GlobalConfigUpdate(log_target="/config/log/app.log")
|
|
assert update.log_target == "/config/log/app.log"
|
|
|
|
|
|
def test_global_config_update_invalid_log_target_path(_mock_allowed_dirs: None) -> None:
|
|
"""GlobalConfigUpdate rejects log targets outside allowed directories."""
|
|
from app.models.config import GlobalConfigUpdate
|
|
|
|
with pytest.raises(ValidationError) as exc_info:
|
|
GlobalConfigUpdate(log_target="/etc/passwd")
|
|
error_msg = str(exc_info.value)
|
|
assert "outside allowed directories" in error_msg
|
|
|
|
|
|
def test_global_config_update_log_target_case_insensitive(_mock_allowed_dirs: None) -> None:
|
|
"""GlobalConfigUpdate special log targets are accepted in any case."""
|
|
from app.models.config import GlobalConfigUpdate
|
|
|
|
update = GlobalConfigUpdate(log_target="stdout")
|
|
assert update.log_target == "stdout"
|
|
|
|
update = GlobalConfigUpdate(log_target="STDERR")
|
|
assert update.log_target == "STDERR"
|
|
|
|
|
|
def test_global_config_update_none_fields(_mock_allowed_dirs: None) -> None:
|
|
"""GlobalConfigUpdate allows None for optional fields."""
|
|
from app.models.config import GlobalConfigUpdate
|
|
|
|
update = GlobalConfigUpdate()
|
|
assert update.log_level is None
|
|
assert update.log_target is None
|
|
|
|
|
|
def test_global_config_response_log_level(_mock_allowed_dirs: None) -> None:
|
|
"""GlobalConfigResponse enforces valid log levels."""
|
|
from app.models.config import GlobalConfigResponse
|
|
|
|
response = GlobalConfigResponse(
|
|
log_level="INFO",
|
|
log_target="STDOUT",
|
|
db_purge_age=86400,
|
|
db_max_matches=10,
|
|
)
|
|
assert response.log_level == "INFO"
|
|
|
|
with pytest.raises(ValidationError):
|
|
GlobalConfigResponse(
|
|
log_level="invalid",
|
|
log_target="STDOUT",
|
|
db_purge_age=86400,
|
|
db_max_matches=10,
|
|
)
|
|
|
|
|
|
def test_global_config_response_log_target_special(_mock_allowed_dirs: None) -> None:
|
|
"""GlobalConfigResponse accepts special log targets."""
|
|
from app.models.config import GlobalConfigResponse
|
|
|
|
response = GlobalConfigResponse(
|
|
log_level="INFO",
|
|
log_target="SYSLOG",
|
|
db_purge_age=86400,
|
|
db_max_matches=10,
|
|
)
|
|
assert response.log_target == "SYSLOG"
|
|
|
|
|
|
def test_global_config_response_log_target_path(_mock_allowed_dirs: None) -> None:
|
|
"""GlobalConfigResponse accepts validated log target paths."""
|
|
from app.models.config import GlobalConfigResponse
|
|
|
|
response = GlobalConfigResponse(
|
|
log_level="INFO",
|
|
log_target="/var/log/fail2ban.log",
|
|
db_purge_age=86400,
|
|
db_max_matches=10,
|
|
)
|
|
assert response.log_target == "/var/log/fail2ban.log"
|
|
|
|
|
|
def test_global_config_response_log_target_invalid_path(_mock_allowed_dirs: None) -> None:
|
|
"""GlobalConfigResponse rejects log targets outside allowed directories."""
|
|
from app.models.config import GlobalConfigResponse
|
|
|
|
with pytest.raises(ValidationError) as exc_info:
|
|
GlobalConfigResponse(
|
|
log_level="INFO",
|
|
log_target="/root/secret.log",
|
|
db_purge_age=86400,
|
|
db_max_matches=10,
|
|
)
|
|
error_msg = str(exc_info.value)
|
|
assert "outside allowed directories" in error_msg
|