From 94bdabe622de52ef7cbefd8627e92b824cce1065 Mon Sep 17 00:00:00 2001 From: Lukas Date: Sun, 26 Apr 2026 14:04:21 +0200 Subject: [PATCH] TASK-016: Validate delete_log_path query parameter with allowlist - Extract path validation logic into shared helper function in backend/app/utils/path_utils.py (validate_log_path) - Refactor AddLogPathRequest to use the helper function - Apply the same validation to DELETE /api/config/jails/{name}/logpath endpoint by validating the log_path query parameter - Return HTTP 422 with descriptive error if validation fails - Add comprehensive unit tests for path validation - Update Backend-Development.md with usage examples This prevents path-traversal attacks on the delete_log_path endpoint by ensuring all log paths are within allowlisted directories. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- Docs/Backend-Development.md | 40 ++++++-- Docs/Tasks.md | 34 ------- backend/app/models/config.py | 25 +---- backend/app/routers/jail_config.py | 12 ++- backend/app/utils/path_utils.py | 40 ++++++++ backend/tests/test_models.py | 3 + backend/tests/test_path_utils.py | 149 +++++++++++++++++++++++++++++ 7 files changed, 236 insertions(+), 67 deletions(-) create mode 100644 backend/app/utils/path_utils.py create mode 100644 backend/tests/test_path_utils.py diff --git a/Docs/Backend-Development.md b/Docs/Backend-Development.md index 29565d8..f75ec3b 100644 --- a/Docs/Backend-Development.md +++ b/Docs/Backend-Development.md @@ -219,27 +219,47 @@ For fields that require complex validation (e.g., file paths that must be within ```python from pydantic import field_validator +from app.utils.path_utils import validate_log_path class AddLogPathRequest(BaseModel): log_path: str = Field(..., description="Absolute path to the log file to monitor.") @field_validator("log_path", mode="after") @classmethod - def validate_log_path(cls, value: str) -> str: + def validate_log_path_field(cls, value: str) -> str: """Validate that the log path is within allowed directories.""" - settings = get_settings() - resolved_path = Path(value).resolve() - for allowed_dir in settings.allowed_log_dirs: - if resolved_path.is_relative_to(Path(allowed_dir).resolve()): - return value - raise ValueError(f"Path {value!r} is outside allowed directories") + return validate_log_path(value) +``` + +**Path Validation Helper:** + +For query parameters and other contexts where Pydantic validators cannot be used directly, use the `validate_log_path()` helper from `app.utils.path_utils`: + +```python +from fastapi import HTTPException, status +from app.utils.path_utils import validate_log_path + +@router.delete("/{name}/logpath") +async def delete_log_path( + name: str, + log_path: str = Query(...), +) -> None: + try: + validate_log_path(log_path) + except ValueError as e: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail=str(e), + ) from e + # ... rest of handler ``` **Key points:** -- Use `mode="after"` to validate after Pydantic's basic type coercion. +- Use `mode="after"` in model validators to validate after Pydantic's basic type coercion. - Raise `ValueError` if validation fails; Pydantic converts it to an HTTP 400 response. -- **Never use string prefix matching** for path validation (e.g., `path.startswith("/var/log")`). Use `Path.is_relative_to()` to avoid bypasses like `/var/log_evil/file.log`. -- Resolve symlinks before validating to prevent symlink-based escapes. +- For query parameters that cannot use Pydantic validators, use the `validate_log_path()` helper and raise HTTP 422. +- **Never use string prefix matching** for path validation (e.g., `path.startswith("/var/log")`). The helper uses `Path.relative_to()` to prevent bypasses like `/var/log_evil/file.log`. +- Symlinks are resolved before validating to prevent symlink-based escapes. --- diff --git a/Docs/Tasks.md b/Docs/Tasks.md index 7537e4b..58e0c0b 100644 --- a/Docs/Tasks.md +++ b/Docs/Tasks.md @@ -1,37 +1,3 @@ -## TASK-015 — `GlobalConfigUpdate.log_target`/`log_level` have no validation - -**Severity:** High - -### Where found -`backend/app/models/config.py` — `GlobalConfigUpdate`. `backend/app/services/config_service.py` — `update_global_config()`. - -### Why this is needed -`log_target` is forwarded raw to fail2ban via the Unix socket. fail2ban (running as root) creates or opens the file at that path if it does not exist. `log_level` is forwarded raw without checking it is a valid fail2ban log level. Both fields represent an injection path into fail2ban's internal state from an authenticated but potentially compromised account. - -### Goal -Validate both fields before forwarding to fail2ban. - -### What to do -1. Change `log_target` in `GlobalConfigUpdate` to accept only: - - `Literal["STDOUT", "STDERR", "SYSLOG"]`, or - - A path validated the same way as `AddLogPathRequest.log_path` (see TASK-014). -2. Change `log_level` to `Literal["CRITICAL", "ERROR", "WARNING", "NOTICE", "INFO", "DEBUG"]`. -3. Apply the same restrictions in `get_global_config` responses for consistency. - -### Possible traps and issues -- The allowlist for `log_target` paths must be consistent with TASK-014 (`BANGUI_ALLOWED_LOG_DIRS`). -- Existing deployments using non-standard `log_target` values (e.g., `/var/log/fail2ban.log`) must still work — ensure `/var/log` is in the default allowlist. - -### Docs changes needed -- `Features.md` — document valid values for `log_target` and `log_level`. -- `Backend-Development.md` — Pydantic Literal types for constrained string fields. - -### Doc references -- [Features.md](Features.md) — global fail2ban configuration -- [Backend-Development.md](Backend-Development.md) — model validation - ---- - ## TASK-016 — `delete_log_path` query parameter unvalidated **Severity:** Medium diff --git a/backend/app/models/config.py b/backend/app/models/config.py index 34b1757..a3432f8 100644 --- a/backend/app/models/config.py +++ b/backend/app/models/config.py @@ -10,6 +10,7 @@ from typing import Literal from pydantic import BaseModel, ConfigDict, Field, field_validator from app.config import get_settings +from app.utils.path_utils import validate_log_path DNSMode = Literal["yes", "warn", "no", "raw"] LogEncoding = Literal["auto", "ascii", "utf-8", "UTF-8", "latin-1"] @@ -295,12 +296,9 @@ class AddLogPathRequest(BaseModel): @field_validator("log_path", mode="after") @classmethod - def validate_log_path(cls, value: str) -> str: + def validate_log_path_field(cls, value: str) -> str: """Validate that the log path is within allowed directories. - Resolves the path to its canonical form (resolving symlinks) and checks - that it is relative to one of the allowed log directories from settings. - Args: value: The log path to validate. @@ -310,24 +308,7 @@ class AddLogPathRequest(BaseModel): Raises: ValueError: If the path is outside allowed log directories. """ - settings = get_settings() - try: - resolved_path = Path(value).resolve() - except (OSError, RuntimeError) as e: - raise ValueError(f"Cannot resolve path {value!r}: {e}") from e - - for allowed_dir in settings.allowed_log_dirs: - allowed_path = Path(allowed_dir).resolve() - try: - resolved_path.relative_to(allowed_path) - return value - except ValueError: - continue - - allowed_dirs_str = ", ".join(settings.allowed_log_dirs) - raise ValueError( - f"Log path {value!r} is outside allowed directories: {allowed_dirs_str}" - ) + return validate_log_path(value) diff --git a/backend/app/routers/jail_config.py b/backend/app/routers/jail_config.py index 817ce8c..c55d633 100644 --- a/backend/app/routers/jail_config.py +++ b/backend/app/routers/jail_config.py @@ -3,7 +3,7 @@ from __future__ import annotations import shlex from typing import Annotated -from fastapi import APIRouter, Path, Query, Request, status +from fastapi import APIRouter, HTTPException, Path, Query, Request, status from app.dependencies import ( AppDep, @@ -33,6 +33,7 @@ from app.services import ( filter_config_service, jail_config_service, ) +from app.utils.path_utils import validate_log_path from app.utils.runtime_state import ( clear_activation_record, clear_pending_recovery, @@ -248,10 +249,19 @@ async def delete_log_path( log_path: Absolute path to the log file to remove (query parameter). Raises: + HTTPException: 422 when the log path is outside allowed directories. HTTPException: 404 when the jail does not exist. HTTPException: 400 when the command is rejected. HTTPException: 502 when fail2ban is unreachable. """ + try: + validate_log_path(log_path) + except ValueError as e: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail=str(e), + ) from e + await config_service.delete_log_path(socket_path, name, log_path) diff --git a/backend/app/utils/path_utils.py b/backend/app/utils/path_utils.py new file mode 100644 index 0000000..2b2376a --- /dev/null +++ b/backend/app/utils/path_utils.py @@ -0,0 +1,40 @@ +"""Path validation utilities.""" + +from pathlib import Path + +from app.config import get_settings + + +def validate_log_path(log_path: str) -> str: + """Validate that a log path is within allowed directories. + + Resolves the path to its canonical form (resolving symlinks) and checks + that it is relative to one of the allowed log directories from settings. + + Args: + log_path: The log path to validate. + + Returns: + The validated log path (unchanged). + + Raises: + ValueError: If the path is outside allowed log directories. + """ + settings = get_settings() + try: + resolved_path = Path(log_path).resolve() + except (OSError, RuntimeError) as e: + raise ValueError(f"Cannot resolve path {log_path!r}: {e}") from e + + for allowed_dir in settings.allowed_log_dirs: + allowed_path = Path(allowed_dir).resolve() + try: + resolved_path.relative_to(allowed_path) + return log_path + except ValueError: + continue + + allowed_dirs_str = ", ".join(settings.allowed_log_dirs) + raise ValueError( + f"Log path {log_path!r} is outside allowed directories: {allowed_dirs_str}" + ) diff --git a/backend/tests/test_models.py b/backend/tests/test_models.py index 2fa3c6a..166adb4 100644 --- a/backend/tests/test_models.py +++ b/backend/tests/test_models.py @@ -22,6 +22,7 @@ def _mock_allowed_dirs(monkeypatch: pytest.MonkeyPatch) -> None: ) monkeypatch.setattr("app.models.config.get_settings", mock_get_settings) + monkeypatch.setattr("app.utils.path_utils.get_settings", mock_get_settings) def test_add_log_path_request_valid_in_var_log(_mock_allowed_dirs: None) -> None: @@ -90,6 +91,7 @@ def test_add_log_path_request_rejects_symlink_escape(monkeypatch: pytest.MonkeyP ) monkeypatch.setattr("app.models.config.get_settings", mock_get_settings) + monkeypatch.setattr("app.utils.path_utils.get_settings", mock_get_settings) with pytest.raises(ValidationError) as exc_info: AddLogPathRequest(log_path=str(symlink / "evil.log"), tail=True) @@ -132,6 +134,7 @@ def test_add_log_path_request_custom_allowed_dirs(monkeypatch: pytest.MonkeyPatc ) monkeypatch.setattr("app.models.config.get_settings", mock_get_settings) + monkeypatch.setattr("app.utils.path_utils.get_settings", mock_get_settings) req = AddLogPathRequest(log_path="/custom/logs/app.log", tail=True) assert req.log_path == "/custom/logs/app.log" diff --git a/backend/tests/test_path_utils.py b/backend/tests/test_path_utils.py new file mode 100644 index 0000000..e8e6542 --- /dev/null +++ b/backend/tests/test_path_utils.py @@ -0,0 +1,149 @@ +"""Unit tests for path validation utilities.""" + +import tempfile +from pathlib import Path + +import pytest + +from app.config import Settings +from app.utils.path_utils import validate_log_path + + +@pytest.fixture +def _mock_settings(monkeypatch: pytest.MonkeyPatch) -> None: + """Mock get_settings to return test settings with default allowed directories.""" + def mock_get_settings() -> Settings: + return Settings( + database_path=":memory:", + fail2ban_socket="/tmp/fake.sock", + fail2ban_config_dir="/tmp/fail2ban", + session_secret="test-secret-key-do-not-use", + ) + + monkeypatch.setattr("app.utils.path_utils.get_settings", mock_get_settings) + + +def test_validate_log_path_valid_in_var_log(_mock_settings: None) -> None: + """Valid log paths in /var/log are accepted.""" + result = validate_log_path("/var/log/auth.log") + assert result == "/var/log/auth.log" + + +def test_validate_log_path_valid_in_config_log(_mock_settings: None) -> None: + """Valid log paths in /config/log are accepted.""" + result = validate_log_path("/config/log/app.log") + assert result == "/config/log/app.log" + + +def test_validate_log_path_valid_with_subdirectory(_mock_settings: None) -> None: + """Log paths in subdirectories of allowed paths are accepted.""" + result = validate_log_path("/var/log/syslog/auth.log") + assert result == "/var/log/syslog/auth.log" + + +def test_validate_log_path_rejects_path_outside_allowed(_mock_settings: None) -> None: + """Paths outside allowed directories are rejected.""" + with pytest.raises(ValueError) as exc_info: + validate_log_path("/etc/passwd") + error_msg = str(exc_info.value) + assert "outside allowed directories" in error_msg + assert "/etc/passwd" in error_msg + + +def test_validate_log_path_rejects_home_directory(_mock_settings: None) -> None: + """Paths in home directories are rejected.""" + with pytest.raises(ValueError) as exc_info: + validate_log_path("/home/user/app.log") + error_msg = str(exc_info.value) + assert "outside allowed directories" in error_msg + + +def test_validate_log_path_rejects_shadow_file(_mock_settings: None) -> None: + """Paths to sensitive files like /etc/shadow are rejected.""" + with pytest.raises(ValueError) as exc_info: + validate_log_path("/etc/shadow") + error_msg = str(exc_info.value) + assert "outside allowed directories" in error_msg + + +def test_validate_log_path_rejects_symlink_escape(monkeypatch: pytest.MonkeyPatch) -> None: + """Symlinks that escape allowed directories are rejected.""" + with tempfile.TemporaryDirectory() as tmpdir: + allowed_dir = Path(tmpdir) / "allowed" + escape_dir = Path(tmpdir) / "escape" + allowed_dir.mkdir() + escape_dir.mkdir() + + symlink = allowed_dir / "escape_link" + symlink.symlink_to(escape_dir) + + def mock_get_settings() -> Settings: + return Settings( + database_path=":memory:", + fail2ban_socket="/tmp/fake.sock", + fail2ban_config_dir="/tmp/fail2ban", + session_secret="test-secret-key-do-not-use", + allowed_log_dirs=[str(allowed_dir)], + ) + + monkeypatch.setattr("app.utils.path_utils.get_settings", mock_get_settings) + + # Accessing the symlink resolves it to the escape_dir, which is outside the allowed_dir. + with pytest.raises(ValueError) as exc_info: + validate_log_path(str(symlink / "evil.log")) + error_msg = str(exc_info.value) + assert "outside allowed directories" in error_msg + + +def test_validate_log_path_rejects_prefix_bypass(_mock_settings: None) -> None: + """Paths that are similar to allowed paths but outside (e.g., /var/log_evil) are rejected.""" + with pytest.raises(ValueError) as exc_info: + validate_log_path("/var/log_evil/somefile.log") + error_msg = str(exc_info.value) + assert "outside allowed directories" in error_msg + + +def test_validate_log_path_returns_unchanged_value(_mock_settings: None) -> None: + """The returned value is exactly what was passed in.""" + path = "/var/log/app.log" + result = validate_log_path(path) + assert result is path or result == path + + +def test_validate_log_path_rejects_custom_allowed_dir_outside( + _mock_settings: None, monkeypatch: pytest.MonkeyPatch +) -> None: + """Paths outside custom allowed directories are rejected.""" + def mock_get_settings() -> Settings: + return Settings( + database_path=":memory:", + fail2ban_socket="/tmp/fake.sock", + fail2ban_config_dir="/tmp/fail2ban", + session_secret="test-secret-key-do-not-use", + allowed_log_dirs=["/custom/logs"], + ) + + monkeypatch.setattr("app.utils.path_utils.get_settings", mock_get_settings) + + with pytest.raises(ValueError) as exc_info: + validate_log_path("/var/log/app.log") + error_msg = str(exc_info.value) + assert "outside allowed directories" in error_msg + assert "/custom/logs" in error_msg + + +def test_validate_log_path_accepts_custom_allowed_dir(monkeypatch: pytest.MonkeyPatch) -> None: + """Paths within custom allowed directories are accepted.""" + def mock_get_settings() -> Settings: + return Settings( + database_path=":memory:", + fail2ban_socket="/tmp/fake.sock", + fail2ban_config_dir="/tmp/fail2ban", + session_secret="test-secret-key-do-not-use", + allowed_log_dirs=["/custom/logs"], + ) + + monkeypatch.setattr("app.utils.path_utils.get_settings", mock_get_settings) + + result = validate_log_path("/custom/logs/app.log") + assert result == "/custom/logs/app.log"