Harden preview_log path validation and add regression test
This commit is contained in:
@@ -7,9 +7,10 @@ from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import re
|
||||
import structlog
|
||||
from pathlib import Path
|
||||
from typing import TypeVar, cast
|
||||
from typing import cast
|
||||
|
||||
import structlog
|
||||
|
||||
from app.exceptions import ConfigOperationError
|
||||
from app.models.config import (
|
||||
@@ -42,7 +43,7 @@ _SAFE_LOG_PREFIXES: tuple[str, ...] = ("/var/log", "/config/log")
|
||||
def _ok(response: object) -> object:
|
||||
"""Extract the payload from a fail2ban ``(return_code, data)`` response."""
|
||||
try:
|
||||
code, data = cast(Fail2BanResponse, response)
|
||||
code, data = cast("Fail2BanResponse", response)
|
||||
except (TypeError, ValueError) as exc:
|
||||
raise ValueError(
|
||||
f"Unexpected fail2ban response shape: {response!r}"
|
||||
@@ -80,16 +81,13 @@ async def _safe_get(
|
||||
return default
|
||||
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
async def _safe_get_typed(
|
||||
client: Fail2BanClient,
|
||||
command: list[str],
|
||||
default: T,
|
||||
) -> T:
|
||||
"""Send a command and return the result typed as ``default``'s type."""
|
||||
return cast("T", await _safe_get(client, command, default))
|
||||
default: object | None = None,
|
||||
) -> object | None:
|
||||
"""Send a command and return a typed result or *default* if it fails."""
|
||||
return await _safe_get(client, command, default)
|
||||
|
||||
|
||||
async def read_fail2ban_log(
|
||||
@@ -210,7 +208,31 @@ async def preview_log(req: LogPreviewRequest) -> LogPreviewResponse:
|
||||
)
|
||||
|
||||
path = Path(req.log_path)
|
||||
if not path.is_file():
|
||||
try:
|
||||
resolved = path.resolve(strict=False)
|
||||
except (ValueError, OSError) as exc:
|
||||
return LogPreviewResponse(
|
||||
lines=[],
|
||||
total_lines=0,
|
||||
matched_count=0,
|
||||
regex_error=f"Cannot resolve log path {req.log_path!r}: {exc}",
|
||||
)
|
||||
|
||||
if not any(
|
||||
resolved.is_relative_to(Path(prefix))
|
||||
for prefix in _SAFE_LOG_PREFIXES
|
||||
):
|
||||
return LogPreviewResponse(
|
||||
lines=[],
|
||||
total_lines=0,
|
||||
matched_count=0,
|
||||
regex_error=(
|
||||
f"Log path {str(resolved)!r} is outside the allowed directory. "
|
||||
"Only paths under /var/log or /config/log are permitted."
|
||||
),
|
||||
)
|
||||
|
||||
if not resolved.is_file():
|
||||
return LogPreviewResponse(
|
||||
lines=[],
|
||||
total_lines=0,
|
||||
@@ -221,7 +243,7 @@ async def preview_log(req: LogPreviewRequest) -> LogPreviewResponse:
|
||||
try:
|
||||
raw_lines = await run_blocking(
|
||||
_read_tail_lines,
|
||||
str(path),
|
||||
str(resolved),
|
||||
req.num_lines,
|
||||
)
|
||||
except OSError as exc:
|
||||
|
||||
Reference in New Issue
Block a user