TASK-010: Replace .split() with shlex.split() for fail2ban_start_command

- Add @field_validator for fail2ban_start_command to validate with shlex.split()
  at startup, catching misconfigured commands with mismatched quotes
- Replace .split() with shlex.split() in jail_config.py line 450
- Replace .split() with shlex.split() in config_misc.py line 154
- Update Backend-Development.md with configuration documentation explaining
  quoted path handling and common pitfalls
- Add comprehensive test suite (8 tests) covering valid commands, quoted paths,
  and mismatched quote errors

This fix ensures commands like '/opt/my tools/fail2ban-client' start are
correctly parsed as two tokens instead of three, preventing execution failures
when the path contains spaces.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
2026-04-26 13:04:14 +02:00
parent 4ab767e3d4
commit 8698b89f6a
6 changed files with 160 additions and 38 deletions

View File

@@ -4,6 +4,7 @@ Follows pydantic-settings patterns: all values are prefixed with BANGUI_
and validated at startup via the Settings singleton.
"""
import shlex
from typing import Literal
from pydantic import Field, field_validator
@@ -151,6 +152,32 @@ class Settings(BaseSettings):
),
)
@field_validator("fail2ban_start_command", mode="after")
@classmethod
def _validate_fail2ban_start_command(cls, value: str) -> str:
"""Validate fail2ban_start_command by attempting to parse it with shlex.
Ensures the command can be split into arguments without shell interpretation.
Raises ValueError if the command contains mismatched quotes.
Args:
value: The fail2ban start command string.
Returns:
The validated command string.
Raises:
ValueError: If the command contains mismatched quotes.
"""
try:
shlex.split(value)
except ValueError as e:
raise ValueError(
f"fail2ban_start_command contains mismatched quotes or is otherwise "
f"unparseable: {value!r}{e}"
) from e
return value
model_config = SettingsConfigDict(
env_prefix="BANGUI_",
env_file=".env",

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import shlex
from typing import Annotated
import structlog
@@ -149,7 +150,7 @@ async def restart_fail2ban(
``POST /api/config/jails/{name}/rollback``
if a specific jail is suspect.
"""
start_cmd_parts: list[str] = start_cmd.split()
start_cmd_parts: list[str] = shlex.split(start_cmd)
restarted = await jail_service.restart_daemon(
socket_path,

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import shlex
from typing import Annotated
from fastapi import APIRouter, Path, Query, Request, status
@@ -445,7 +446,7 @@ async def rollback_jail(
HTTPException: 400 if *name* contains invalid characters.
HTTPException: 500 if writing the .local override file fails.
"""
start_cmd_parts: list[str] = start_cmd.split()
start_cmd_parts: list[str] = shlex.split(start_cmd)
result = await jail_config_service.rollback_jail(config_dir, socket_path, name, start_cmd_parts)

View File

@@ -0,0 +1,107 @@
"""Unit tests for application configuration and validation."""
import pytest
from pydantic import ValidationError
from app.config import Settings
def test_fail2ban_start_command_validates_simple_command() -> None:
"""Simple fail2ban commands without special characters are accepted."""
settings = Settings(
database_path="/tmp/test.db",
fail2ban_socket="/tmp/fake_fail2ban.sock",
fail2ban_config_dir="/tmp/fail2ban",
session_secret="test-secret-key-do-not-use-in-production",
fail2ban_start_command="fail2ban-client start",
)
assert settings.fail2ban_start_command == "fail2ban-client start"
def test_fail2ban_start_command_validates_systemctl_command() -> None:
"""systemctl commands are accepted."""
settings = Settings(
database_path="/tmp/test.db",
fail2ban_socket="/tmp/fake_fail2ban.sock",
fail2ban_config_dir="/tmp/fail2ban",
session_secret="test-secret-key-do-not-use-in-production",
fail2ban_start_command="systemctl start fail2ban",
)
assert settings.fail2ban_start_command == "systemctl start fail2ban"
def test_fail2ban_start_command_accepts_quoted_paths() -> None:
"""Commands with quoted paths containing spaces are accepted."""
settings = Settings(
database_path="/tmp/test.db",
fail2ban_socket="/tmp/fake_fail2ban.sock",
fail2ban_config_dir="/tmp/fail2ban",
session_secret="test-secret-key-do-not-use-in-production",
fail2ban_start_command='"/opt/my tools/fail2ban-client" start',
)
assert settings.fail2ban_start_command == '"/opt/my tools/fail2ban-client" start'
def test_fail2ban_start_command_rejects_mismatched_quotes() -> None:
"""Commands with mismatched quotes raise ValidationError."""
with pytest.raises(ValidationError) as exc_info:
Settings(
database_path="/tmp/test.db",
fail2ban_socket="/tmp/fake_fail2ban.sock",
fail2ban_config_dir="/tmp/fail2ban",
session_secret="test-secret-key-do-not-use-in-production",
fail2ban_start_command='"/opt/my tools/fail2ban-client start',
)
error_msg = str(exc_info.value)
assert "fail2ban_start_command" in error_msg
assert "mismatched quotes" in error_msg or "No closing quotation" in error_msg
def test_fail2ban_start_command_error_includes_problematic_value() -> None:
"""Validation errors include the problematic command value."""
problematic_command = '"/opt/broken start'
with pytest.raises(ValidationError) as exc_info:
Settings(
database_path="/tmp/test.db",
fail2ban_socket="/tmp/fake_fail2ban.sock",
fail2ban_config_dir="/tmp/fail2ban",
session_secret="test-secret-key-do-not-use-in-production",
fail2ban_start_command=problematic_command,
)
error_msg = str(exc_info.value)
assert problematic_command in error_msg
def test_fail2ban_start_command_default_value_is_valid() -> None:
"""The default fail2ban_start_command value is valid and parseable."""
settings = Settings(
database_path="/tmp/test.db",
fail2ban_socket="/tmp/fake_fail2ban.sock",
fail2ban_config_dir="/tmp/fail2ban",
session_secret="test-secret-key-do-not-use-in-production",
)
assert settings.fail2ban_start_command == "fail2ban-client start"
def test_fail2ban_start_command_single_quoted() -> None:
"""Commands with single quotes are accepted."""
settings = Settings(
database_path="/tmp/test.db",
fail2ban_socket="/tmp/fake_fail2ban.sock",
fail2ban_config_dir="/tmp/fail2ban",
session_secret="test-secret-key-do-not-use-in-production",
fail2ban_start_command="'/usr/bin/fail2ban-client' start",
)
assert settings.fail2ban_start_command == "'/usr/bin/fail2ban-client' start"
def test_fail2ban_start_command_multiple_arguments() -> None:
"""Commands with multiple arguments are accepted."""
settings = Settings(
database_path="/tmp/test.db",
fail2ban_socket="/tmp/fake_fail2ban.sock",
fail2ban_config_dir="/tmp/fail2ban",
session_secret="test-secret-key-do-not-use-in-production",
fail2ban_start_command="fail2ban-client -c /etc/fail2ban start",
)
assert settings.fail2ban_start_command == "fail2ban-client -c /etc/fail2ban start"