diff --git a/Docs/Backend-Development.md b/Docs/Backend-Development.md index 6dc58b8..29565d8 100644 --- a/Docs/Backend-Development.md +++ b/Docs/Backend-Development.md @@ -191,6 +191,56 @@ class BanResponse(BaseModel): ban_count: int = Field(..., ge=1, description="Number of times this IP was banned") ``` +### Using `Literal` Types for Constrained Strings + +When a field should only accept a small set of predefined values, use `Literal` to enforce this at the type level: + +```python +from typing import Literal +from pydantic import BaseModel, Field + +LogLevel = Literal["CRITICAL", "ERROR", "WARNING", "NOTICE", "INFO", "DEBUG"] + +class GlobalConfigUpdate(BaseModel): + log_level: LogLevel | None = Field( + default=None, + description="Log level: CRITICAL, ERROR, WARNING, NOTICE, INFO, or DEBUG.", + ) +``` + +This provides: +- **Type safety** — IDEs and type checkers enforce valid values. +- **API documentation** — OpenAPI docs automatically list all allowed values. +- **Validation** — Pydantic rejects invalid values and provides a clear error message. + +### Custom Field Validators + +For fields that require complex validation (e.g., file paths that must be within allowed directories), use `@field_validator`: + +```python +from pydantic import field_validator + +class AddLogPathRequest(BaseModel): + log_path: str = Field(..., description="Absolute path to the log file to monitor.") + + @field_validator("log_path", mode="after") + @classmethod + def validate_log_path(cls, value: str) -> str: + """Validate that the log path is within allowed directories.""" + settings = get_settings() + resolved_path = Path(value).resolve() + for allowed_dir in settings.allowed_log_dirs: + if resolved_path.is_relative_to(Path(allowed_dir).resolve()): + return value + raise ValueError(f"Path {value!r} is outside allowed directories") +``` + +**Key points:** +- Use `mode="after"` to validate after Pydantic's basic type coercion. +- Raise `ValueError` if validation fails; Pydantic converts it to an HTTP 400 response. +- **Never use string prefix matching** for path validation (e.g., `path.startswith("/var/log")`). Use `Path.is_relative_to()` to avoid bypasses like `/var/log_evil/file.log`. +- Resolve symlinks before validating to prevent symlink-based escapes. + --- ## 6. Async Rules diff --git a/Docs/Features.md b/Docs/Features.md index f891b00..5b711ae 100644 --- a/Docs/Features.md +++ b/Docs/Features.md @@ -228,8 +228,10 @@ A page to inspect and modify the fail2ban configuration without leaving the web ### Server Settings -- View and change the fail2ban log level (e.g. Critical, Error, Warning, Info, Debug). -- View and change the log target (file path, stdout, stderr, syslog, systemd journal). +- View and change the fail2ban log level using valid values: `CRITICAL`, `ERROR`, `WARNING`, `NOTICE`, `INFO`, `DEBUG`. +- View and change the log target, which can be: + - Special values: `STDOUT`, `STDERR`, `SYSLOG` + - A file path that resolves to one of the configured safe log directories (default: `/var/log` and `/config/log`). Symlinks are resolved to their canonical targets before validation. - View and change the syslog socket if syslog is used. - Flush and re-open log files (useful after log rotation). - View and change the fail2ban database file location. @@ -264,7 +266,7 @@ A page to inspect and modify the fail2ban configuration without leaving the web - **Auto-refresh** toggle with interval selector (5 s / 10 s / 30 s) for live monitoring. - Truncation notice when the total log file line count exceeds the requested tail limit. - Container automatically scrolls to the bottom after each data update. -- When fail2ban is configured to log to a non-file target (STDOUT, STDERR, SYSLOG, SYSTEMD-JOURNAL), an informational banner explains that file-based log viewing is unavailable. +- When fail2ban is configured to log to a non-file target (`STDOUT`, `STDERR`, or `SYSLOG`), an informational banner explains that file-based log viewing is unavailable. - Log file paths are validated against a configurable allowlist of safe directories on the backend to prevent unauthorized reads of sensitive system files. --- diff --git a/Docs/Tasks.md b/Docs/Tasks.md index 51f8347..7537e4b 100644 --- a/Docs/Tasks.md +++ b/Docs/Tasks.md @@ -1,38 +1,3 @@ -## TASK-014 — `add_log_path` passes arbitrary paths to fail2ban — no allowlist - -**Severity:** High - -### Where found -`backend/app/services/config_service.py` — `add_log_path()`. `backend/app/models/config.py` — `AddLogPathRequest.log_path: str`. - -### Why this is needed -An authenticated user can instruct fail2ban to monitor any file path on the system (e.g., `log_path: "/etc/shadow"`). fail2ban runs as root and opens the file for reading. Even if fail2ban cannot meaningfully parse it, repeated log monitoring of sensitive files can leak their contents via fail2ban's own logging, and the feature represents an unintended read primitive into arbitrary root-readable files. - -### Goal -Restrict monitored log paths to a configurable set of safe directories. - -### What to do -1. Add a `@field_validator("log_path")` to `AddLogPathRequest` that: - - Calls `Path(log_path).resolve()` to canonicalize. - - Checks `resolved.is_relative_to(Path("/var/log"))` or any path in `settings.allowed_log_dirs` (a new configurable list). - - Raises `ValueError` if the path is outside allowed prefixes. -2. Add `BANGUI_ALLOWED_LOG_DIRS` to `Settings` as `list[str]` defaulting to `["/var/log", "/config/log"]`. -3. Note: use `is_relative_to()`, not `startswith()` — the latter is bypassable with `/var/log_evil`. - -### Possible traps and issues -- The validator runs on the Pydantic model before the service is called — the resolved path check happens at request time, not at the OS level. The allowed list must match the actual Docker volume mount paths. -- Custom log file locations (e.g., `/home/app/logs`) need to be added to `BANGUI_ALLOWED_LOG_DIRS`. - -### Docs changes needed -- `Features.md` — document the log path restrictions. -- `Backend-Development.md` — input validation for path parameters. - -### Doc references -- [Features.md](Features.md) — jail log monitoring -- [Backend-Development.md](Backend-Development.md) — path validation - ---- - ## TASK-015 — `GlobalConfigUpdate.log_target`/`log_level` have no validation **Severity:** High diff --git a/backend/app/models/config.py b/backend/app/models/config.py index 5173f51..34b1757 100644 --- a/backend/app/models/config.py +++ b/backend/app/models/config.py @@ -14,6 +14,8 @@ from app.config import get_settings DNSMode = Literal["yes", "warn", "no", "raw"] LogEncoding = Literal["auto", "ascii", "utf-8", "UTF-8", "latin-1"] BackendType = Literal["auto", "polling", "pyinotify", "systemd", "gamin"] +LogLevel = Literal["CRITICAL", "ERROR", "WARNING", "NOTICE", "INFO", "DEBUG"] +LogTarget = Literal["STDOUT", "STDERR", "SYSLOG"] # --------------------------------------------------------------------------- # Ban-time escalation @@ -177,28 +179,103 @@ class GlobalConfigResponse(BaseModel): model_config = ConfigDict(strict=True) - log_level: str - log_target: str + log_level: LogLevel + log_target: str = Field(..., description="Log target: STDOUT, STDERR, SYSLOG, or a validated file path.") db_purge_age: int = Field(..., description="Seconds after which ban records are purged from the fail2ban DB.") db_max_matches: int = Field(..., description="Maximum stored log-line matches per ban record.") + @field_validator("log_target", mode="after") + @classmethod + def validate_log_target(cls, value: str) -> str: + """Validate that log_target is either a special value or a valid file path. + + Args: + value: The log target to validate. + + Returns: + The validated log target. + + Raises: + ValueError: If the target is not a special value and not in allowed directories. + """ + if value.upper() in ("STDOUT", "STDERR", "SYSLOG"): + return value + + settings = get_settings() + try: + resolved_path = Path(value).resolve() + except (OSError, RuntimeError) as e: + raise ValueError(f"Cannot resolve path {value!r}: {e}") from e + + for allowed_dir in settings.allowed_log_dirs: + allowed_path = Path(allowed_dir).resolve() + try: + resolved_path.relative_to(allowed_path) + return value + except ValueError: + continue + + allowed_dirs_str = ", ".join(settings.allowed_log_dirs) + raise ValueError( + f"Log target {value!r} is outside allowed directories: {allowed_dirs_str}" + ) + class GlobalConfigUpdate(BaseModel): """Payload for ``PUT /api/config/global``.""" model_config = ConfigDict(strict=True) - log_level: str | None = Field( + log_level: LogLevel | None = Field( default=None, - description="Log level: CRITICAL, ERROR, WARNING, NOTICE, INFO, DEBUG.", + description="Log level: CRITICAL, ERROR, WARNING, NOTICE, INFO, or DEBUG.", ) log_target: str | None = Field( default=None, - description="Log target: STDOUT, STDERR, SYSLOG, SYSTEMD-JOURNAL, or a file path.", + description="Log target: STDOUT, STDERR, SYSLOG, or a validated file path.", ) db_purge_age: int | None = Field(default=None, ge=0) db_max_matches: int | None = Field(default=None, ge=0) + @field_validator("log_target", mode="after") + @classmethod + def validate_log_target(cls, value: str | None) -> str | None: + """Validate that log_target is either a special value or a valid file path. + + Args: + value: The log target to validate, or None. + + Returns: + The validated log target, or None if input was None. + + Raises: + ValueError: If the target is not a special value and not in allowed directories. + """ + if value is None: + return None + + if value.upper() in ("STDOUT", "STDERR", "SYSLOG"): + return value + + settings = get_settings() + try: + resolved_path = Path(value).resolve() + except (OSError, RuntimeError) as e: + raise ValueError(f"Cannot resolve path {value!r}: {e}") from e + + for allowed_dir in settings.allowed_log_dirs: + allowed_path = Path(allowed_dir).resolve() + try: + resolved_path.relative_to(allowed_path) + return value + except ValueError: + continue + + allowed_dirs_str = ", ".join(settings.allowed_log_dirs) + raise ValueError( + f"Log target {value!r} is outside allowed directories: {allowed_dirs_str}" + ) + # --------------------------------------------------------------------------- # Log observation / preview models diff --git a/backend/tests/test_models.py b/backend/tests/test_models.py index bc9e81c..2fa3c6a 100644 --- a/backend/tests/test_models.py +++ b/backend/tests/test_models.py @@ -139,3 +139,150 @@ def test_add_log_path_request_custom_allowed_dirs(monkeypatch: pytest.MonkeyPatc with pytest.raises(ValidationError): AddLogPathRequest(log_path="/var/log/app.log", tail=True) + + +# --------------------------------------------------------------------------- +# GlobalConfigUpdate and GlobalConfigResponse +# --------------------------------------------------------------------------- + + +def test_global_config_update_valid_log_level(_mock_allowed_dirs: None) -> None: + """GlobalConfigUpdate accepts valid log levels.""" + from app.models.config import GlobalConfigUpdate + + for level in ["CRITICAL", "ERROR", "WARNING", "NOTICE", "INFO", "DEBUG"]: + update = GlobalConfigUpdate(log_level=level) + assert update.log_level == level + + +def test_global_config_update_invalid_log_level(_mock_allowed_dirs: None) -> None: + """GlobalConfigUpdate rejects invalid log levels.""" + from app.models.config import GlobalConfigUpdate + + with pytest.raises(ValidationError) as exc_info: + GlobalConfigUpdate(log_level="invalid") + error_msg = str(exc_info.value) + assert "CRITICAL" in error_msg + + +def test_global_config_update_log_level_case_sensitive(_mock_allowed_dirs: None) -> None: + """GlobalConfigUpdate log_level is case-sensitive (must be uppercase).""" + from app.models.config import GlobalConfigUpdate + + with pytest.raises(ValidationError): + GlobalConfigUpdate(log_level="debug") + + with pytest.raises(ValidationError): + GlobalConfigUpdate(log_level="Debug") + + +def test_global_config_update_valid_log_target_special(_mock_allowed_dirs: None) -> None: + """GlobalConfigUpdate accepts special log target values.""" + from app.models.config import GlobalConfigUpdate + + for target in ["STDOUT", "STDERR", "SYSLOG"]: + update = GlobalConfigUpdate(log_target=target) + assert update.log_target == target + + +def test_global_config_update_valid_log_target_path(_mock_allowed_dirs: None) -> None: + """GlobalConfigUpdate accepts log targets that are valid file paths.""" + from app.models.config import GlobalConfigUpdate + + update = GlobalConfigUpdate(log_target="/var/log/fail2ban.log") + assert update.log_target == "/var/log/fail2ban.log" + + update = GlobalConfigUpdate(log_target="/config/log/app.log") + assert update.log_target == "/config/log/app.log" + + +def test_global_config_update_invalid_log_target_path(_mock_allowed_dirs: None) -> None: + """GlobalConfigUpdate rejects log targets outside allowed directories.""" + from app.models.config import GlobalConfigUpdate + + with pytest.raises(ValidationError) as exc_info: + GlobalConfigUpdate(log_target="/etc/passwd") + error_msg = str(exc_info.value) + assert "outside allowed directories" in error_msg + + +def test_global_config_update_log_target_case_insensitive(_mock_allowed_dirs: None) -> None: + """GlobalConfigUpdate special log targets are accepted in any case.""" + from app.models.config import GlobalConfigUpdate + + update = GlobalConfigUpdate(log_target="stdout") + assert update.log_target == "stdout" + + update = GlobalConfigUpdate(log_target="STDERR") + assert update.log_target == "STDERR" + + +def test_global_config_update_none_fields(_mock_allowed_dirs: None) -> None: + """GlobalConfigUpdate allows None for optional fields.""" + from app.models.config import GlobalConfigUpdate + + update = GlobalConfigUpdate() + assert update.log_level is None + assert update.log_target is None + + +def test_global_config_response_log_level(_mock_allowed_dirs: None) -> None: + """GlobalConfigResponse enforces valid log levels.""" + from app.models.config import GlobalConfigResponse + + response = GlobalConfigResponse( + log_level="INFO", + log_target="STDOUT", + db_purge_age=86400, + db_max_matches=10, + ) + assert response.log_level == "INFO" + + with pytest.raises(ValidationError): + GlobalConfigResponse( + log_level="invalid", + log_target="STDOUT", + db_purge_age=86400, + db_max_matches=10, + ) + + +def test_global_config_response_log_target_special(_mock_allowed_dirs: None) -> None: + """GlobalConfigResponse accepts special log targets.""" + from app.models.config import GlobalConfigResponse + + response = GlobalConfigResponse( + log_level="INFO", + log_target="SYSLOG", + db_purge_age=86400, + db_max_matches=10, + ) + assert response.log_target == "SYSLOG" + + +def test_global_config_response_log_target_path(_mock_allowed_dirs: None) -> None: + """GlobalConfigResponse accepts validated log target paths.""" + from app.models.config import GlobalConfigResponse + + response = GlobalConfigResponse( + log_level="INFO", + log_target="/var/log/fail2ban.log", + db_purge_age=86400, + db_max_matches=10, + ) + assert response.log_target == "/var/log/fail2ban.log" + + +def test_global_config_response_log_target_invalid_path(_mock_allowed_dirs: None) -> None: + """GlobalConfigResponse rejects log targets outside allowed directories.""" + from app.models.config import GlobalConfigResponse + + with pytest.raises(ValidationError) as exc_info: + GlobalConfigResponse( + log_level="INFO", + log_target="/root/secret.log", + db_purge_age=86400, + db_max_matches=10, + ) + error_msg = str(exc_info.value) + assert "outside allowed directories" in error_msg diff --git a/backend/tests/test_services/test_config_service.py b/backend/tests/test_services/test_config_service.py index c9dc9b1..6b73d86 100644 --- a/backend/tests/test_services/test_config_service.py +++ b/backend/tests/test_services/test_config_service.py @@ -9,6 +9,7 @@ from unittest.mock import AsyncMock, patch import pytest +from app.config import Settings from app.models.config import ( GlobalConfigUpdate, JailConfigListResponse, @@ -22,6 +23,25 @@ from app.services.config_service import ( JailNotFoundError, ) +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture(autouse=True) +def _mock_settings(monkeypatch: pytest.MonkeyPatch) -> None: + """Mock get_settings for all tests in this module.""" + def mock_get_settings() -> Settings: + return Settings( + database_path=":memory:", + fail2ban_socket="/tmp/fake.sock", + fail2ban_config_dir="/tmp/fail2ban", + session_secret="test-secret-key-do-not-use", + ) + + monkeypatch.setattr("app.models.config.get_settings", mock_get_settings) + + # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- @@ -472,7 +492,7 @@ class TestUpdateGlobalConfig: def __init__(self, **_kw: Any) -> None: self.send = AsyncMock(side_effect=_send) - update = GlobalConfigUpdate(log_level="debug", db_purge_age=3600) + update = GlobalConfigUpdate(log_level="DEBUG", db_purge_age=3600) with patch("app.services.config_service.Fail2BanClient", _FakeClient): await config_service.update_global_config(_SOCKET, update) @@ -492,7 +512,7 @@ class TestUpdateGlobalConfig: def __init__(self, **_kw: Any) -> None: self.send = AsyncMock(side_effect=_send) - update = GlobalConfigUpdate(log_level="debug") + update = GlobalConfigUpdate(log_level="DEBUG") with patch("app.services.config_service.Fail2BanClient", _FakeClient): await config_service.update_global_config(_SOCKET, update)