Refactor: Make model packages true leaf nodes - remove app-layer dependencies

Models in app/models/ are now pure data classes with no cross-layer dependencies.
This ensures the models layer remains a true leaf node in the dependency graph.

Changes:
- Create app/models/_common.py with shared types (TimeRange, bucket_count, constants)
- Move TimeRange and time-range constants from ban.py to _common.py
- Update history.py, routers, and services to import from _common.py
- Remove imports from app.config and app.utils from config.py models
- Move field validators from models to router layer:
  - Add log_target validation in config_misc router
  - Add log_path validation in jail_config router
- Update test_models.py to reflect validators moved to router layer
- Update documentation (Architekture.md, Backend-Development.md) with model layering rules
- Fix import ordering and type annotations in affected files

Model layering rule: Models may only import from:
✓ Standard library and third-party packages (Pydantic, typing)
✓ Other models in app/models/ (sibling models)
✓ app.models.response (response envelopes)
✗ app.services, app.config, app.utils, or any application layer

Validation requiring app-level state (settings, allowed directories) now happens
at the router or service layer, not in model validators.

Fixes: Models were not true leaf nodes due to circular imports and app-layer dependencies

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
2026-04-30 19:31:11 +02:00
parent 3d1a6f5538
commit 100fd47c4b
15 changed files with 1542 additions and 396 deletions

View File

@@ -491,6 +491,13 @@ Pydantic schemas that define data shapes and validation. Models are split into t
| `server.py` | Server status and settings models |
| `setup.py` | First-run setup wizard models |
**Model Layering Rules:** Models are pure data classes (leaf nodes) in the dependency graph. They must not import from application-layer modules (`app.services`, `app.config`, `app.utils`). Models may import from:
- Standard library and third-party packages (Pydantic, typing)
- Other models in `app.models/` (sibling models)
- `app.models.response` (response envelopes)
Validation that requires access to app-level state (e.g., allowed log directories) must be moved to the router or service layer, not in model validators.
#### Tasks (`app/tasks/`)
APScheduler background jobs that run on a schedule without user interaction.

View File

@@ -742,6 +742,11 @@ This policy eliminates a whole class of frontendbackend contract bugs. If the
- Validate at the boundary — once data enters a Pydantic model it is trusted.
- Use `Field(...)` with descriptions for every field to keep auto-generated docs useful.
- Separate **request models**, **response models**, and **domain (internal) models** — do not reuse one model for all three.
- **Models are leaf nodes**: Models in `app/models/` must not import from application-layer modules (`app.services`, `app.config`, `app.utils`). Models may only import from:
- Standard library and third-party packages (Pydantic, typing)
- Other models in `app/models/` (sibling models)
- `app.models.response` (response envelopes)
- Validation that requires app-level state (e.g., `settings`, allowed directories) must happen at the router or service layer, never in model validators.
### Using `Literal` Types for Constrained Strings
@@ -765,27 +770,34 @@ This provides:
- **API documentation** — OpenAPI docs automatically list all allowed values.
- **Validation** — Pydantic rejects invalid values and provides a clear error message.
### Custom Field Validators
### Field Validators and Validation Placement
For fields that require complex validation (e.g., file paths that must be within allowed directories), use `@field_validator`:
Field validators in models should only contain logic that is **stateless and does not depend on application configuration or state**. Validators must not import from `app.config`, `app.utils`, or `app.services`.
For validation that depends on app-level state (e.g., file paths that must be within allowed directories), perform validation in the router or service layer:
```python
from pydantic import field_validator
# ✅ Good: Validation in router (has access to settings)
from fastapi import APIRouter
from app.config import get_settings
from app.utils.path_utils import validate_log_path
class AddLogPathRequest(BaseModel):
log_path: str = Field(..., description="Absolute path to the log file to monitor.")
@router.post("/jails/{name}/logpath")
async def add_log_path(name: str, body: AddLogPathRequest) -> None:
# Validate before using
validate_log_path(body.log_path)
await config_service.add_log_path(socket_path, name, body)
@field_validator("log_path", mode="after")
@classmethod
def validate_log_path_field(cls, value: str) -> str:
"""Validate that the log path is within allowed directories."""
return validate_log_path(value)
# ❌ Avoid: Importing from app layer in model validators
# Do NOT do this in app/models/config.py:
# from app.config import get_settings
# @field_validator("log_path")
# def validate_log_path_field(cls, value: str) -> str:
# settings = get_settings() # ← Models must not import from app layer
# ...
```
**Path Validation Helper:**
For query parameters and other contexts where Pydantic validators cannot be used directly, use the `validate_log_path()` helper from `app.utils.path_utils`:
**Common Helper:** For shared path validation logic, use the `validate_log_path()` helper from `app.utils.path_utils` in your router or service, not in model validators.
```python
from fastapi import HTTPException, status

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,49 @@
"""Shared types and constants used across multiple model modules.
This module defines types and constants that are used by multiple
model modules, ensuring a single source of truth for cross-model types.
"""
import math
from typing import Literal
#: The four supported time-range presets for the dashboard views.
TimeRange = Literal["24h", "7d", "30d", "365d"]
#: Number of seconds represented by each preset.
TIME_RANGE_SECONDS: dict[str, int] = {
"24h": 24 * 3600,
"7d": 7 * 24 * 3600,
"30d": 30 * 24 * 3600,
"365d": 365 * 24 * 3600,
}
#: Bucket size in seconds for each time-range preset.
BUCKET_SECONDS: dict[str, int] = {
"24h": 3_600, # 1 hour → 24 buckets
"7d": 6 * 3_600, # 6 hours → 28 buckets
"30d": 86_400, # 1 day → 30 buckets
"365d": 7 * 86_400, # 7 days → ~53 buckets
}
#: Human-readable bucket size label for each time-range preset.
BUCKET_SIZE_LABEL: dict[str, str] = {
"24h": "1h",
"7d": "6h",
"30d": "1d",
"365d": "7d",
}
def bucket_count(range_: TimeRange) -> int:
"""Return the number of buckets needed to cover *range_* completely.
Args:
range_: One of the supported time-range presets.
Returns:
Ceiling division of the range duration by the bucket size so that
the last bucket is included even when the window is not an exact
multiple of the bucket size.
"""
return math.ceil(TIME_RANGE_SECONDS[range_] / BUCKET_SECONDS[range_])

View File

@@ -3,7 +3,6 @@
Request, response, and domain models used by the ban router and service.
"""
import math
from typing import Literal
from pydantic import Field
@@ -11,21 +10,6 @@ from pydantic import Field
from app.models.response import BanGuiBaseModel, CollectionResponse, PaginatedListResponse
# ---------------------------------------------------------------------------
# Time-range selector
# ---------------------------------------------------------------------------
#: The four supported time-range presets for the dashboard views.
TimeRange = Literal["24h", "7d", "30d", "365d"]
#: Number of seconds represented by each preset.
TIME_RANGE_SECONDS: dict[str, int] = {
"24h": 24 * 3600,
"7d": 7 * 24 * 3600,
"30d": 30 * 24 * 3600,
"365d": 365 * 24 * 3600,
}
class BanRequest(BanGuiBaseModel):
"""Payload for ``POST /api/bans`` (ban an IP)."""
@@ -204,36 +188,7 @@ class BansByCountryResponse(BanGuiBaseModel):
# ---------------------------------------------------------------------------
# Trend endpoint models
# ---------------------------------------------------------------------------
#: Bucket size in seconds for each time-range preset.
BUCKET_SECONDS: dict[str, int] = {
"24h": 3_600, # 1 hour → 24 buckets
"7d": 6 * 3_600, # 6 hours → 28 buckets
"30d": 86_400, # 1 day → 30 buckets
"365d": 7 * 86_400, # 7 days → ~53 buckets
}
#: Human-readable bucket size label for each time-range preset.
BUCKET_SIZE_LABEL: dict[str, str] = {
"24h": "1h",
"7d": "6h",
"30d": "1d",
"365d": "7d",
}
def bucket_count(range_: TimeRange) -> int:
"""Return the number of buckets needed to cover *range_* completely.
Args:
range_: One of the supported time-range presets.
Returns:
Ceiling division of the range duration by the bucket size so that
the last bucket is included even when the window is not an exact
multiple of the bucket size.
"""
return math.ceil(TIME_RANGE_SECONDS[range_] / BUCKET_SECONDS[range_])
class BanTrendBucket(BanGuiBaseModel):
"""A single time bucket in the ban trend series."""

View File

@@ -4,14 +4,11 @@ Request, response, and domain models for the config router and service.
"""
import datetime
from pathlib import Path
from typing import Literal
from pydantic import Field, field_validator
from pydantic import Field
from app.config import get_settings
from app.models.response import BanGuiBaseModel, CollectionResponse
from app.utils.path_utils import validate_log_path
DNSMode = Literal["yes", "warn", "no", "raw"]
LogEncoding = Literal["auto", "ascii", "utf-8", "UTF-8", "latin-1"]
@@ -158,42 +155,6 @@ class GlobalConfigResponse(BanGuiBaseModel):
db_purge_age: int = Field(..., description="Seconds after which ban records are purged from the fail2ban DB.")
db_max_matches: int = Field(..., description="Maximum stored log-line matches per ban record.")
@field_validator("log_target", mode="after")
@classmethod
def validate_log_target(cls, value: str) -> str:
"""Validate that log_target is either a special value or a valid file path.
Args:
value: The log target to validate.
Returns:
The validated log target.
Raises:
ValueError: If the target is not a special value and not in allowed directories.
"""
if value.upper() in ("STDOUT", "STDERR", "SYSLOG"):
return value
settings = get_settings()
try:
resolved_path = Path(value).resolve()
except (OSError, RuntimeError) as e:
raise ValueError(f"Cannot resolve path {value!r}: {e}") from e
for allowed_dir in settings.allowed_log_dirs:
allowed_path = Path(allowed_dir).resolve()
try:
resolved_path.relative_to(allowed_path)
return value
except ValueError:
continue
allowed_dirs_str = ", ".join(settings.allowed_log_dirs)
raise ValueError(
f"Log target {value!r} is outside allowed directories: {allowed_dirs_str}"
)
class GlobalConfigUpdate(BanGuiBaseModel):
"""Payload for ``PUT /api/config/global``."""
@@ -208,45 +169,6 @@ class GlobalConfigUpdate(BanGuiBaseModel):
db_purge_age: int | None = Field(default=None, ge=0)
db_max_matches: int | None = Field(default=None, ge=0)
@field_validator("log_target", mode="after")
@classmethod
def validate_log_target(cls, value: str | None) -> str | None:
"""Validate that log_target is either a special value or a valid file path.
Args:
value: The log target to validate, or None.
Returns:
The validated log target, or None if input was None.
Raises:
ValueError: If the target is not a special value and not in allowed directories.
"""
if value is None:
return None
if value.upper() in ("STDOUT", "STDERR", "SYSLOG"):
return value
settings = get_settings()
try:
resolved_path = Path(value).resolve()
except (OSError, RuntimeError) as e:
raise ValueError(f"Cannot resolve path {value!r}: {e}") from e
for allowed_dir in settings.allowed_log_dirs:
allowed_path = Path(allowed_dir).resolve()
try:
resolved_path.relative_to(allowed_path)
return value
except ValueError:
continue
allowed_dirs_str = ", ".join(settings.allowed_log_dirs)
raise ValueError(
f"Log target {value!r} is outside allowed directories: {allowed_dirs_str}"
)
# ---------------------------------------------------------------------------
# Log observation / preview models
# ---------------------------------------------------------------------------
@@ -260,22 +182,6 @@ class AddLogPathRequest(BanGuiBaseModel):
description="If true, monitor from current end of file (tail). If false, read from the beginning.",
)
@field_validator("log_path", mode="after")
@classmethod
def validate_log_path_field(cls, value: str) -> str:
"""Validate that the log path is within allowed directories.
Args:
value: The log path to validate.
Returns:
The validated log path.
Raises:
ValueError: If the path is outside allowed log directories.
"""
return validate_log_path(value)
class LogPreviewRequest(BanGuiBaseModel):
"""Payload for ``POST /api/config/preview-log``."""

View File

@@ -7,10 +7,9 @@ from __future__ import annotations
from pydantic import Field
from app.models._common import TimeRange
from app.models.response import BanGuiBaseModel, PaginatedListResponse
from app.models.ban import TimeRange
__all__ = [
"HistoryBanItem",
"HistoryListResponse",

View File

@@ -1,11 +1,13 @@
from __future__ import annotations
import shlex
from pathlib import Path
from typing import Annotated
import structlog
from fastapi import APIRouter, Query, Request, status
from app.config import get_settings
from app.dependencies import (
AuthDep,
Fail2BanSocketDep,
@@ -36,6 +38,38 @@ log: structlog.stdlib.BoundLogger = structlog.get_logger()
router: APIRouter = APIRouter(tags=["Config Misc"])
def _validate_log_target(value: str) -> None:
"""Validate that log_target is either a special value or a valid file path.
Args:
value: The log target to validate.
Raises:
ValueError: If the target is not a special value and not in allowed directories.
"""
if value.upper() in ("STDOUT", "STDERR", "SYSLOG"):
return
settings = get_settings()
try:
resolved_path = Path(value).resolve()
except (OSError, RuntimeError) as e:
raise ValueError(f"Cannot resolve path {value!r}: {e}") from e
for allowed_dir in settings.allowed_log_dirs:
allowed_path = Path(allowed_dir).resolve()
try:
resolved_path.relative_to(allowed_path)
return
except ValueError:
continue
allowed_dirs_str = ", ".join(settings.allowed_log_dirs)
raise ValueError(
f"Log target {value!r} is outside allowed directories: {allowed_dirs_str}"
)
@router.get(
"/global",
response_model=GlobalConfigResponse,
@@ -82,9 +116,11 @@ async def update_global_config(
body: Partial update — only non-None fields are written.
Raises:
HTTPException: 400 when a set command is rejected.
HTTPException: 400 when a set command is rejected or log_target is invalid.
HTTPException: 502 when fail2ban is unreachable.
"""
if body.log_target is not None:
_validate_log_target(body.log_target)
await config_service.update_global_config(socket_path, body)

View File

@@ -25,21 +25,21 @@ from app.dependencies import (
HttpSessionDep,
ServerStatusDep,
)
from app.mappers import (
map_domain_ban_trend_to_response,
map_domain_bans_by_country_to_response,
map_domain_bans_by_jail_to_response,
map_domain_dashboard_ban_list_to_response,
)
from app.models._common import TimeRange
from app.models.ban import (
BanOrigin,
BansByCountryResponse,
BansByJailResponse,
BanTrendResponse,
DashboardBanListResponse,
TimeRange,
)
from app.models.server import ServerStatus, ServerStatusResponse
from app.mappers import (
map_domain_dashboard_ban_list_to_response,
map_domain_bans_by_country_to_response,
map_domain_ban_trend_to_response,
map_domain_bans_by_jail_to_response,
)
from app.services import ban_service
from app.utils.constants import DEFAULT_PAGE_SIZE

View File

@@ -27,7 +27,8 @@ from app.dependencies import (
HttpSessionDep,
)
from app.exceptions import HistoryNotFoundError
from app.models.ban import BanOrigin, TimeRange
from app.models._common import TimeRange
from app.models.ban import BanOrigin
from app.models.history import HistoryListResponse, IpDetailResponse
from app.services import history_service
from app.utils.constants import DEFAULT_PAGE_SIZE

View File

@@ -219,9 +219,10 @@ async def add_log_path(
Raises:
HTTPException: 404 when the jail does not exist.
HTTPException: 400 when the command is rejected.
HTTPException: 400 when the command is rejected or path is invalid.
HTTPException: 502 when fail2ban is unreachable.
"""
validate_log_path(body.log_path)
await config_service.add_log_path(socket_path, name, body)

View File

@@ -19,15 +19,17 @@ import aiohttp
import structlog
from app.exceptions import JailNotFoundError, JailOperationError
from app.models.ban import (
BLOCKLIST_JAIL,
from app.models._common import (
BUCKET_SECONDS,
BUCKET_SIZE_LABEL,
BanOrigin,
TimeRange,
_derive_origin,
bucket_count,
)
from app.models.ban import (
BLOCKLIST_JAIL,
BanOrigin,
_derive_origin,
)
from app.models.ban_domain import (
DomainActiveBan,
DomainActiveBanList,
@@ -320,7 +322,7 @@ async def get_active_bans(
except (TimeoutError, aiohttp.ClientError, OSError):
log.warning("active_bans_batch_geo_failed")
geo_map = {}
enriched: list[ActiveBan] = []
enriched: list[DomainActiveBan] = []
for ban in bans:
geo = geo_map.get(ban.ip)
if geo is not None:

View File

@@ -15,12 +15,12 @@ from typing import TYPE_CHECKING
import structlog
from app.models.ban import BanOrigin, TimeRange
if TYPE_CHECKING:
import aiohttp
import aiosqlite
from app.models._common import TimeRange
from app.models.ban import BanOrigin
from app.models.geo import GeoEnricher, GeoInfo
from app.repositories.protocols import HistoryArchiveRepository
from app.services.protocols import Fail2BanMetadataService

View File

@@ -14,8 +14,9 @@ if TYPE_CHECKING:
import aiohttp
import aiosqlite
from app.models._common import TimeRange
from app.models.auth import Session
from app.models.ban import BanOrigin, JailBannedIpsResponse, TimeRange
from app.models.ban import BanOrigin, JailBannedIpsResponse
from app.models.blocklist import (
BlocklistSource,
ImportLogListResponse,

View File

@@ -1,146 +1,26 @@
"""Unit tests for Pydantic models and their validators."""
import tempfile
from pathlib import Path
import pytest
from pydantic import ValidationError
from app.config import Settings
from app.models.config import AddLogPathRequest
from app.models.config import GlobalConfigUpdate, GlobalConfigResponse
@pytest.fixture
def _mock_allowed_dirs(monkeypatch: pytest.MonkeyPatch) -> None:
"""Mock get_settings to return test settings with default allowed directories."""
def mock_get_settings() -> Settings:
return Settings(
database_path=":memory:",
fail2ban_socket="/tmp/fake.sock",
fail2ban_config_dir="/tmp/fail2ban",
session_secret="a" * 32,
)
monkeypatch.setattr("app.models.config.get_settings", mock_get_settings)
monkeypatch.setattr("app.utils.path_utils.get_settings", mock_get_settings)
def test_add_log_path_request_valid_in_var_log(_mock_allowed_dirs: None) -> None:
"""Valid log paths in /var/log are accepted."""
req = AddLogPathRequest(log_path="/var/log/auth.log", tail=True)
assert req.log_path == "/var/log/auth.log"
assert req.tail is True
def test_add_log_path_request_valid_in_config_log(_mock_allowed_dirs: None) -> None:
"""Valid log paths in /config/log are accepted."""
req = AddLogPathRequest(log_path="/config/log/app.log", tail=False)
assert req.log_path == "/config/log/app.log"
assert req.tail is False
def test_add_log_path_request_valid_with_subdirectory(_mock_allowed_dirs: None) -> None:
"""Log paths in subdirectories of allowed paths are accepted."""
req = AddLogPathRequest(log_path="/var/log/syslog/auth.log", tail=True)
assert req.log_path == "/var/log/syslog/auth.log"
def test_add_log_path_request_rejects_path_outside_allowed(_mock_allowed_dirs: None) -> None:
"""Paths outside allowed directories are rejected."""
with pytest.raises(ValidationError) as exc_info:
AddLogPathRequest(log_path="/etc/passwd", tail=True)
error_msg = str(exc_info.value)
assert "outside allowed directories" in error_msg
assert "/etc/passwd" in error_msg
def test_add_log_path_request_rejects_home_directory(_mock_allowed_dirs: None) -> None:
"""Paths in home directories are rejected."""
with pytest.raises(ValidationError) as exc_info:
AddLogPathRequest(log_path="/home/user/app.log", tail=True)
error_msg = str(exc_info.value)
assert "outside allowed directories" in error_msg
def test_add_log_path_request_rejects_shadow_file(_mock_allowed_dirs: None) -> None:
"""Paths to sensitive files like /etc/shadow are rejected."""
with pytest.raises(ValidationError) as exc_info:
AddLogPathRequest(log_path="/etc/shadow", tail=True)
error_msg = str(exc_info.value)
assert "outside allowed directories" in error_msg
def test_add_log_path_request_rejects_symlink_escape(monkeypatch: pytest.MonkeyPatch) -> None:
"""Symlinks that escape allowed directories are rejected."""
with tempfile.TemporaryDirectory() as tmpdir:
allowed_dir = Path(tmpdir) / "allowed"
escape_dir = Path(tmpdir) / "escape"
allowed_dir.mkdir()
escape_dir.mkdir()
symlink = allowed_dir / "escape_link"
symlink.symlink_to(escape_dir)
def mock_get_settings() -> Settings:
return Settings(
database_path=":memory:",
fail2ban_socket="/tmp/fake.sock",
fail2ban_config_dir="/tmp/fail2ban",
session_secret="test-secret-key-do-not-use",
allowed_log_dirs=[str(allowed_dir)],
)
monkeypatch.setattr("app.models.config.get_settings", mock_get_settings)
monkeypatch.setattr("app.utils.path_utils.get_settings", mock_get_settings)
with pytest.raises(ValidationError) as exc_info:
AddLogPathRequest(log_path=str(symlink / "evil.log"), tail=True)
error_msg = str(exc_info.value)
assert "outside allowed directories" in error_msg
def test_add_log_path_request_validates_startswith_bypass(_mock_allowed_dirs: None) -> None:
"""Paths like /var/log_evil that bypass startswith() are rejected."""
with pytest.raises(ValidationError) as exc_info:
AddLogPathRequest(log_path="/var/log_evil/somefile.log", tail=True)
error_msg = str(exc_info.value)
assert "outside allowed directories" in error_msg
def test_add_log_path_request_default_tail_is_true(_mock_allowed_dirs: None) -> None:
def test_add_log_path_request_default_tail_is_true() -> None:
"""Tail defaults to True."""
from app.models.config import AddLogPathRequest
req = AddLogPathRequest(log_path="/var/log/app.log")
assert req.tail is True
def test_add_log_path_request_error_message_lists_allowed_dirs(_mock_allowed_dirs: None) -> None:
"""Error message includes the list of allowed directories."""
with pytest.raises(ValidationError) as exc_info:
AddLogPathRequest(log_path="/root/secret.log", tail=True)
error_msg = str(exc_info.value)
assert "/var/log" in error_msg
assert "/config/log" in error_msg
def test_add_log_path_request_custom_allowed_dirs(monkeypatch: pytest.MonkeyPatch) -> None:
"""Custom allowed directories from settings are respected."""
def mock_get_settings() -> Settings:
return Settings(
database_path=":memory:",
fail2ban_socket="/tmp/fake.sock",
fail2ban_config_dir="/tmp/fail2ban",
session_secret="test-secret-key-do-not-use",
allowed_log_dirs=["/custom/logs", "/another/path"],
)
monkeypatch.setattr("app.models.config.get_settings", mock_get_settings)
monkeypatch.setattr("app.utils.path_utils.get_settings", mock_get_settings)
req = AddLogPathRequest(log_path="/custom/logs/app.log", tail=True)
assert req.log_path == "/custom/logs/app.log"
with pytest.raises(ValidationError):
AddLogPathRequest(log_path="/var/log/app.log", tail=True)
def test_add_log_path_request_can_be_created() -> None:
"""AddLogPathRequest can be created with valid data (no validators in model)."""
from app.models.config import AddLogPathRequest
req = AddLogPathRequest(log_path="/etc/passwd", tail=True)
# Note: path validation is now in the router layer, not in the model
assert req.log_path == "/etc/passwd"
assert req.tail is True
@@ -149,29 +29,23 @@ def test_add_log_path_request_custom_allowed_dirs(monkeypatch: pytest.MonkeyPatc
# ---------------------------------------------------------------------------
def test_global_config_update_valid_log_level(_mock_allowed_dirs: None) -> None:
def test_global_config_update_valid_log_level() -> None:
"""GlobalConfigUpdate accepts valid log levels."""
from app.models.config import GlobalConfigUpdate
for level in ["CRITICAL", "ERROR", "WARNING", "NOTICE", "INFO", "DEBUG"]:
update = GlobalConfigUpdate(log_level=level)
assert update.log_level == level
def test_global_config_update_invalid_log_level(_mock_allowed_dirs: None) -> None:
def test_global_config_update_invalid_log_level() -> None:
"""GlobalConfigUpdate rejects invalid log levels."""
from app.models.config import GlobalConfigUpdate
with pytest.raises(ValidationError) as exc_info:
GlobalConfigUpdate(log_level="invalid")
error_msg = str(exc_info.value)
assert "CRITICAL" in error_msg
def test_global_config_update_log_level_case_sensitive(_mock_allowed_dirs: None) -> None:
def test_global_config_update_log_level_case_sensitive() -> None:
"""GlobalConfigUpdate log_level is case-sensitive (must be uppercase)."""
from app.models.config import GlobalConfigUpdate
with pytest.raises(ValidationError):
GlobalConfigUpdate(log_level="debug")
@@ -179,60 +53,29 @@ def test_global_config_update_log_level_case_sensitive(_mock_allowed_dirs: None)
GlobalConfigUpdate(log_level="Debug")
def test_global_config_update_valid_log_target_special(_mock_allowed_dirs: None) -> None:
"""GlobalConfigUpdate accepts special log target values."""
from app.models.config import GlobalConfigUpdate
def test_global_config_update_can_set_log_target() -> None:
"""GlobalConfigUpdate can set log_target (validation moved to router)."""
# Note: path validation for log_target is now in the router layer
update = GlobalConfigUpdate(log_target="/etc/passwd")
assert update.log_target == "/etc/passwd"
def test_global_config_update_special_log_targets() -> None:
"""GlobalConfigUpdate accepts special log target values (no validation in model)."""
for target in ["STDOUT", "STDERR", "SYSLOG"]:
update = GlobalConfigUpdate(log_target=target)
assert update.log_target == target
def test_global_config_update_valid_log_target_path(_mock_allowed_dirs: None) -> None:
"""GlobalConfigUpdate accepts log targets that are valid file paths."""
from app.models.config import GlobalConfigUpdate
update = GlobalConfigUpdate(log_target="/var/log/fail2ban.log")
assert update.log_target == "/var/log/fail2ban.log"
update = GlobalConfigUpdate(log_target="/config/log/app.log")
assert update.log_target == "/config/log/app.log"
def test_global_config_update_invalid_log_target_path(_mock_allowed_dirs: None) -> None:
"""GlobalConfigUpdate rejects log targets outside allowed directories."""
from app.models.config import GlobalConfigUpdate
with pytest.raises(ValidationError) as exc_info:
GlobalConfigUpdate(log_target="/etc/passwd")
error_msg = str(exc_info.value)
assert "outside allowed directories" in error_msg
def test_global_config_update_log_target_case_insensitive(_mock_allowed_dirs: None) -> None:
"""GlobalConfigUpdate special log targets are accepted in any case."""
from app.models.config import GlobalConfigUpdate
update = GlobalConfigUpdate(log_target="stdout")
assert update.log_target == "stdout"
update = GlobalConfigUpdate(log_target="STDERR")
assert update.log_target == "STDERR"
def test_global_config_update_none_fields(_mock_allowed_dirs: None) -> None:
def test_global_config_update_none_fields() -> None:
"""GlobalConfigUpdate allows None for optional fields."""
from app.models.config import GlobalConfigUpdate
update = GlobalConfigUpdate()
assert update.log_level is None
assert update.log_target is None
def test_global_config_response_log_level(_mock_allowed_dirs: None) -> None:
def test_global_config_response_log_level() -> None:
"""GlobalConfigResponse enforces valid log levels."""
from app.models.config import GlobalConfigResponse
response = GlobalConfigResponse(
log_level="INFO",
log_target="STDOUT",
@@ -250,10 +93,8 @@ def test_global_config_response_log_level(_mock_allowed_dirs: None) -> None:
)
def test_global_config_response_log_target_special(_mock_allowed_dirs: None) -> None:
def test_global_config_response_log_target_special() -> None:
"""GlobalConfigResponse accepts special log targets."""
from app.models.config import GlobalConfigResponse
response = GlobalConfigResponse(
log_level="INFO",
log_target="SYSLOG",
@@ -263,32 +104,15 @@ def test_global_config_response_log_target_special(_mock_allowed_dirs: None) ->
assert response.log_target == "SYSLOG"
def test_global_config_response_log_target_path(_mock_allowed_dirs: None) -> None:
"""GlobalConfigResponse accepts validated log target paths."""
from app.models.config import GlobalConfigResponse
def test_global_config_response_log_target_path() -> None:
"""GlobalConfigResponse accepts any log target path (validation in router)."""
response = GlobalConfigResponse(
log_level="INFO",
log_target="/var/log/fail2ban.log",
log_target="/root/secret.log",
db_purge_age=86400,
db_max_matches=10,
)
assert response.log_target == "/var/log/fail2ban.log"
def test_global_config_response_log_target_invalid_path(_mock_allowed_dirs: None) -> None:
"""GlobalConfigResponse rejects log targets outside allowed directories."""
from app.models.config import GlobalConfigResponse
with pytest.raises(ValidationError) as exc_info:
GlobalConfigResponse(
log_level="INFO",
log_target="/root/secret.log",
db_purge_age=86400,
db_max_matches=10,
)
error_msg = str(exc_info.value)
assert "outside allowed directories" in error_msg
assert response.log_target == "/root/secret.log"
# ---------------------------------------------------------------------------