TASK-015: Add validation for GlobalConfigUpdate.log_target and log_level

- Add LogLevel Literal type: CRITICAL, ERROR, WARNING, NOTICE, INFO, DEBUG
- Add log_target validation to accept special values (STDOUT, STDERR, SYSLOG)
  or validated file paths within allowed directories
- Update GlobalConfigResponse to use LogLevel type
- Add field_validator for log_target in both GlobalConfigUpdate and
  GlobalConfigResponse following the same pattern as AddLogPathRequest
- Add @autouse fixture to test_config_service.py to mock get_settings
- Update existing tests to use uppercase log level values
- Add 12 comprehensive tests for new validation in test_models.py
- Update Features.md to document valid log_target and log_level values
- Add section to Backend-Development.md documenting Literal types and
  field_validator patterns with examples

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
2026-04-26 13:57:22 +02:00
parent b9e046bd66
commit d66493f135
6 changed files with 306 additions and 45 deletions

View File

@@ -191,6 +191,56 @@ class BanResponse(BaseModel):
ban_count: int = Field(..., ge=1, description="Number of times this IP was banned") ban_count: int = Field(..., ge=1, description="Number of times this IP was banned")
``` ```
### Using `Literal` Types for Constrained Strings
When a field should only accept a small set of predefined values, use `Literal` to enforce this at the type level:
```python
from typing import Literal
from pydantic import BaseModel, Field
LogLevel = Literal["CRITICAL", "ERROR", "WARNING", "NOTICE", "INFO", "DEBUG"]
class GlobalConfigUpdate(BaseModel):
log_level: LogLevel | None = Field(
default=None,
description="Log level: CRITICAL, ERROR, WARNING, NOTICE, INFO, or DEBUG.",
)
```
This provides:
- **Type safety** — IDEs and type checkers enforce valid values.
- **API documentation** — OpenAPI docs automatically list all allowed values.
- **Validation** — Pydantic rejects invalid values and provides a clear error message.
### Custom Field Validators
For fields that require complex validation (e.g., file paths that must be within allowed directories), use `@field_validator`:
```python
from pydantic import field_validator
class AddLogPathRequest(BaseModel):
log_path: str = Field(..., description="Absolute path to the log file to monitor.")
@field_validator("log_path", mode="after")
@classmethod
def validate_log_path(cls, value: str) -> str:
"""Validate that the log path is within allowed directories."""
settings = get_settings()
resolved_path = Path(value).resolve()
for allowed_dir in settings.allowed_log_dirs:
if resolved_path.is_relative_to(Path(allowed_dir).resolve()):
return value
raise ValueError(f"Path {value!r} is outside allowed directories")
```
**Key points:**
- Use `mode="after"` to validate after Pydantic's basic type coercion.
- Raise `ValueError` if validation fails; Pydantic converts it to an HTTP 400 response.
- **Never use string prefix matching** for path validation (e.g., `path.startswith("/var/log")`). Use `Path.is_relative_to()` to avoid bypasses like `/var/log_evil/file.log`.
- Resolve symlinks before validating to prevent symlink-based escapes.
--- ---
## 6. Async Rules ## 6. Async Rules

View File

@@ -228,8 +228,10 @@ A page to inspect and modify the fail2ban configuration without leaving the web
### Server Settings ### Server Settings
- View and change the fail2ban log level (e.g. Critical, Error, Warning, Info, Debug). - View and change the fail2ban log level using valid values: `CRITICAL`, `ERROR`, `WARNING`, `NOTICE`, `INFO`, `DEBUG`.
- View and change the log target (file path, stdout, stderr, syslog, systemd journal). - View and change the log target, which can be:
- Special values: `STDOUT`, `STDERR`, `SYSLOG`
- A file path that resolves to one of the configured safe log directories (default: `/var/log` and `/config/log`). Symlinks are resolved to their canonical targets before validation.
- View and change the syslog socket if syslog is used. - View and change the syslog socket if syslog is used.
- Flush and re-open log files (useful after log rotation). - Flush and re-open log files (useful after log rotation).
- View and change the fail2ban database file location. - View and change the fail2ban database file location.
@@ -264,7 +266,7 @@ A page to inspect and modify the fail2ban configuration without leaving the web
- **Auto-refresh** toggle with interval selector (5 s / 10 s / 30 s) for live monitoring. - **Auto-refresh** toggle with interval selector (5 s / 10 s / 30 s) for live monitoring.
- Truncation notice when the total log file line count exceeds the requested tail limit. - Truncation notice when the total log file line count exceeds the requested tail limit.
- Container automatically scrolls to the bottom after each data update. - Container automatically scrolls to the bottom after each data update.
- When fail2ban is configured to log to a non-file target (STDOUT, STDERR, SYSLOG, SYSTEMD-JOURNAL), an informational banner explains that file-based log viewing is unavailable. - When fail2ban is configured to log to a non-file target (`STDOUT`, `STDERR`, or `SYSLOG`), an informational banner explains that file-based log viewing is unavailable.
- Log file paths are validated against a configurable allowlist of safe directories on the backend to prevent unauthorized reads of sensitive system files. - Log file paths are validated against a configurable allowlist of safe directories on the backend to prevent unauthorized reads of sensitive system files.
--- ---

View File

@@ -1,38 +1,3 @@
## TASK-014 — `add_log_path` passes arbitrary paths to fail2ban — no allowlist
**Severity:** High
### Where found
`backend/app/services/config_service.py``add_log_path()`. `backend/app/models/config.py``AddLogPathRequest.log_path: str`.
### Why this is needed
An authenticated user can instruct fail2ban to monitor any file path on the system (e.g., `log_path: "/etc/shadow"`). fail2ban runs as root and opens the file for reading. Even if fail2ban cannot meaningfully parse it, repeated log monitoring of sensitive files can leak their contents via fail2ban's own logging, and the feature represents an unintended read primitive into arbitrary root-readable files.
### Goal
Restrict monitored log paths to a configurable set of safe directories.
### What to do
1. Add a `@field_validator("log_path")` to `AddLogPathRequest` that:
- Calls `Path(log_path).resolve()` to canonicalize.
- Checks `resolved.is_relative_to(Path("/var/log"))` or any path in `settings.allowed_log_dirs` (a new configurable list).
- Raises `ValueError` if the path is outside allowed prefixes.
2. Add `BANGUI_ALLOWED_LOG_DIRS` to `Settings` as `list[str]` defaulting to `["/var/log", "/config/log"]`.
3. Note: use `is_relative_to()`, not `startswith()` — the latter is bypassable with `/var/log_evil`.
### Possible traps and issues
- The validator runs on the Pydantic model before the service is called — the resolved path check happens at request time, not at the OS level. The allowed list must match the actual Docker volume mount paths.
- Custom log file locations (e.g., `/home/app/logs`) need to be added to `BANGUI_ALLOWED_LOG_DIRS`.
### Docs changes needed
- `Features.md` — document the log path restrictions.
- `Backend-Development.md` — input validation for path parameters.
### Doc references
- [Features.md](Features.md) — jail log monitoring
- [Backend-Development.md](Backend-Development.md) — path validation
---
## TASK-015 — `GlobalConfigUpdate.log_target`/`log_level` have no validation ## TASK-015 — `GlobalConfigUpdate.log_target`/`log_level` have no validation
**Severity:** High **Severity:** High

View File

@@ -14,6 +14,8 @@ from app.config import get_settings
DNSMode = Literal["yes", "warn", "no", "raw"] DNSMode = Literal["yes", "warn", "no", "raw"]
LogEncoding = Literal["auto", "ascii", "utf-8", "UTF-8", "latin-1"] LogEncoding = Literal["auto", "ascii", "utf-8", "UTF-8", "latin-1"]
BackendType = Literal["auto", "polling", "pyinotify", "systemd", "gamin"] BackendType = Literal["auto", "polling", "pyinotify", "systemd", "gamin"]
LogLevel = Literal["CRITICAL", "ERROR", "WARNING", "NOTICE", "INFO", "DEBUG"]
LogTarget = Literal["STDOUT", "STDERR", "SYSLOG"]
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Ban-time escalation # Ban-time escalation
@@ -177,28 +179,103 @@ class GlobalConfigResponse(BaseModel):
model_config = ConfigDict(strict=True) model_config = ConfigDict(strict=True)
log_level: str log_level: LogLevel
log_target: str log_target: str = Field(..., description="Log target: STDOUT, STDERR, SYSLOG, or a validated file path.")
db_purge_age: int = Field(..., description="Seconds after which ban records are purged from the fail2ban DB.") db_purge_age: int = Field(..., description="Seconds after which ban records are purged from the fail2ban DB.")
db_max_matches: int = Field(..., description="Maximum stored log-line matches per ban record.") db_max_matches: int = Field(..., description="Maximum stored log-line matches per ban record.")
@field_validator("log_target", mode="after")
@classmethod
def validate_log_target(cls, value: str) -> str:
"""Validate that log_target is either a special value or a valid file path.
Args:
value: The log target to validate.
Returns:
The validated log target.
Raises:
ValueError: If the target is not a special value and not in allowed directories.
"""
if value.upper() in ("STDOUT", "STDERR", "SYSLOG"):
return value
settings = get_settings()
try:
resolved_path = Path(value).resolve()
except (OSError, RuntimeError) as e:
raise ValueError(f"Cannot resolve path {value!r}: {e}") from e
for allowed_dir in settings.allowed_log_dirs:
allowed_path = Path(allowed_dir).resolve()
try:
resolved_path.relative_to(allowed_path)
return value
except ValueError:
continue
allowed_dirs_str = ", ".join(settings.allowed_log_dirs)
raise ValueError(
f"Log target {value!r} is outside allowed directories: {allowed_dirs_str}"
)
class GlobalConfigUpdate(BaseModel): class GlobalConfigUpdate(BaseModel):
"""Payload for ``PUT /api/config/global``.""" """Payload for ``PUT /api/config/global``."""
model_config = ConfigDict(strict=True) model_config = ConfigDict(strict=True)
log_level: str | None = Field( log_level: LogLevel | None = Field(
default=None, default=None,
description="Log level: CRITICAL, ERROR, WARNING, NOTICE, INFO, DEBUG.", description="Log level: CRITICAL, ERROR, WARNING, NOTICE, INFO, or DEBUG.",
) )
log_target: str | None = Field( log_target: str | None = Field(
default=None, default=None,
description="Log target: STDOUT, STDERR, SYSLOG, SYSTEMD-JOURNAL, or a file path.", description="Log target: STDOUT, STDERR, SYSLOG, or a validated file path.",
) )
db_purge_age: int | None = Field(default=None, ge=0) db_purge_age: int | None = Field(default=None, ge=0)
db_max_matches: int | None = Field(default=None, ge=0) db_max_matches: int | None = Field(default=None, ge=0)
@field_validator("log_target", mode="after")
@classmethod
def validate_log_target(cls, value: str | None) -> str | None:
"""Validate that log_target is either a special value or a valid file path.
Args:
value: The log target to validate, or None.
Returns:
The validated log target, or None if input was None.
Raises:
ValueError: If the target is not a special value and not in allowed directories.
"""
if value is None:
return None
if value.upper() in ("STDOUT", "STDERR", "SYSLOG"):
return value
settings = get_settings()
try:
resolved_path = Path(value).resolve()
except (OSError, RuntimeError) as e:
raise ValueError(f"Cannot resolve path {value!r}: {e}") from e
for allowed_dir in settings.allowed_log_dirs:
allowed_path = Path(allowed_dir).resolve()
try:
resolved_path.relative_to(allowed_path)
return value
except ValueError:
continue
allowed_dirs_str = ", ".join(settings.allowed_log_dirs)
raise ValueError(
f"Log target {value!r} is outside allowed directories: {allowed_dirs_str}"
)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Log observation / preview models # Log observation / preview models

View File

@@ -139,3 +139,150 @@ def test_add_log_path_request_custom_allowed_dirs(monkeypatch: pytest.MonkeyPatc
with pytest.raises(ValidationError): with pytest.raises(ValidationError):
AddLogPathRequest(log_path="/var/log/app.log", tail=True) AddLogPathRequest(log_path="/var/log/app.log", tail=True)
# ---------------------------------------------------------------------------
# GlobalConfigUpdate and GlobalConfigResponse
# ---------------------------------------------------------------------------
def test_global_config_update_valid_log_level(_mock_allowed_dirs: None) -> None:
"""GlobalConfigUpdate accepts valid log levels."""
from app.models.config import GlobalConfigUpdate
for level in ["CRITICAL", "ERROR", "WARNING", "NOTICE", "INFO", "DEBUG"]:
update = GlobalConfigUpdate(log_level=level)
assert update.log_level == level
def test_global_config_update_invalid_log_level(_mock_allowed_dirs: None) -> None:
"""GlobalConfigUpdate rejects invalid log levels."""
from app.models.config import GlobalConfigUpdate
with pytest.raises(ValidationError) as exc_info:
GlobalConfigUpdate(log_level="invalid")
error_msg = str(exc_info.value)
assert "CRITICAL" in error_msg
def test_global_config_update_log_level_case_sensitive(_mock_allowed_dirs: None) -> None:
"""GlobalConfigUpdate log_level is case-sensitive (must be uppercase)."""
from app.models.config import GlobalConfigUpdate
with pytest.raises(ValidationError):
GlobalConfigUpdate(log_level="debug")
with pytest.raises(ValidationError):
GlobalConfigUpdate(log_level="Debug")
def test_global_config_update_valid_log_target_special(_mock_allowed_dirs: None) -> None:
"""GlobalConfigUpdate accepts special log target values."""
from app.models.config import GlobalConfigUpdate
for target in ["STDOUT", "STDERR", "SYSLOG"]:
update = GlobalConfigUpdate(log_target=target)
assert update.log_target == target
def test_global_config_update_valid_log_target_path(_mock_allowed_dirs: None) -> None:
"""GlobalConfigUpdate accepts log targets that are valid file paths."""
from app.models.config import GlobalConfigUpdate
update = GlobalConfigUpdate(log_target="/var/log/fail2ban.log")
assert update.log_target == "/var/log/fail2ban.log"
update = GlobalConfigUpdate(log_target="/config/log/app.log")
assert update.log_target == "/config/log/app.log"
def test_global_config_update_invalid_log_target_path(_mock_allowed_dirs: None) -> None:
"""GlobalConfigUpdate rejects log targets outside allowed directories."""
from app.models.config import GlobalConfigUpdate
with pytest.raises(ValidationError) as exc_info:
GlobalConfigUpdate(log_target="/etc/passwd")
error_msg = str(exc_info.value)
assert "outside allowed directories" in error_msg
def test_global_config_update_log_target_case_insensitive(_mock_allowed_dirs: None) -> None:
"""GlobalConfigUpdate special log targets are accepted in any case."""
from app.models.config import GlobalConfigUpdate
update = GlobalConfigUpdate(log_target="stdout")
assert update.log_target == "stdout"
update = GlobalConfigUpdate(log_target="STDERR")
assert update.log_target == "STDERR"
def test_global_config_update_none_fields(_mock_allowed_dirs: None) -> None:
"""GlobalConfigUpdate allows None for optional fields."""
from app.models.config import GlobalConfigUpdate
update = GlobalConfigUpdate()
assert update.log_level is None
assert update.log_target is None
def test_global_config_response_log_level(_mock_allowed_dirs: None) -> None:
"""GlobalConfigResponse enforces valid log levels."""
from app.models.config import GlobalConfigResponse
response = GlobalConfigResponse(
log_level="INFO",
log_target="STDOUT",
db_purge_age=86400,
db_max_matches=10,
)
assert response.log_level == "INFO"
with pytest.raises(ValidationError):
GlobalConfigResponse(
log_level="invalid",
log_target="STDOUT",
db_purge_age=86400,
db_max_matches=10,
)
def test_global_config_response_log_target_special(_mock_allowed_dirs: None) -> None:
"""GlobalConfigResponse accepts special log targets."""
from app.models.config import GlobalConfigResponse
response = GlobalConfigResponse(
log_level="INFO",
log_target="SYSLOG",
db_purge_age=86400,
db_max_matches=10,
)
assert response.log_target == "SYSLOG"
def test_global_config_response_log_target_path(_mock_allowed_dirs: None) -> None:
"""GlobalConfigResponse accepts validated log target paths."""
from app.models.config import GlobalConfigResponse
response = GlobalConfigResponse(
log_level="INFO",
log_target="/var/log/fail2ban.log",
db_purge_age=86400,
db_max_matches=10,
)
assert response.log_target == "/var/log/fail2ban.log"
def test_global_config_response_log_target_invalid_path(_mock_allowed_dirs: None) -> None:
"""GlobalConfigResponse rejects log targets outside allowed directories."""
from app.models.config import GlobalConfigResponse
with pytest.raises(ValidationError) as exc_info:
GlobalConfigResponse(
log_level="INFO",
log_target="/root/secret.log",
db_purge_age=86400,
db_max_matches=10,
)
error_msg = str(exc_info.value)
assert "outside allowed directories" in error_msg

View File

@@ -9,6 +9,7 @@ from unittest.mock import AsyncMock, patch
import pytest import pytest
from app.config import Settings
from app.models.config import ( from app.models.config import (
GlobalConfigUpdate, GlobalConfigUpdate,
JailConfigListResponse, JailConfigListResponse,
@@ -22,6 +23,25 @@ from app.services.config_service import (
JailNotFoundError, JailNotFoundError,
) )
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(autouse=True)
def _mock_settings(monkeypatch: pytest.MonkeyPatch) -> None:
"""Mock get_settings for all tests in this module."""
def mock_get_settings() -> Settings:
return Settings(
database_path=":memory:",
fail2ban_socket="/tmp/fake.sock",
fail2ban_config_dir="/tmp/fail2ban",
session_secret="test-secret-key-do-not-use",
)
monkeypatch.setattr("app.models.config.get_settings", mock_get_settings)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Helpers # Helpers
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -472,7 +492,7 @@ class TestUpdateGlobalConfig:
def __init__(self, **_kw: Any) -> None: def __init__(self, **_kw: Any) -> None:
self.send = AsyncMock(side_effect=_send) self.send = AsyncMock(side_effect=_send)
update = GlobalConfigUpdate(log_level="debug", db_purge_age=3600) update = GlobalConfigUpdate(log_level="DEBUG", db_purge_age=3600)
with patch("app.services.config_service.Fail2BanClient", _FakeClient): with patch("app.services.config_service.Fail2BanClient", _FakeClient):
await config_service.update_global_config(_SOCKET, update) await config_service.update_global_config(_SOCKET, update)
@@ -492,7 +512,7 @@ class TestUpdateGlobalConfig:
def __init__(self, **_kw: Any) -> None: def __init__(self, **_kw: Any) -> None:
self.send = AsyncMock(side_effect=_send) self.send = AsyncMock(side_effect=_send)
update = GlobalConfigUpdate(log_level="debug") update = GlobalConfigUpdate(log_level="DEBUG")
with patch("app.services.config_service.Fail2BanClient", _FakeClient): with patch("app.services.config_service.Fail2BanClient", _FakeClient):
await config_service.update_global_config(_SOCKET, update) await config_service.update_global_config(_SOCKET, update)