feat(vpn): dynamic AllowedIPs routing and improved test coverage

- Parse AllowedIPs from WireGuard config in entrypoint.sh
- Add/remove routes dynamically instead of hardcoded 0.0.0.0/1 split
- Handle both 0.0.0.0/0 and custom AllowedIPs
- Add route cleanup on VPN stop (endpoint + AllowedIPs)
- Update test_vpn.py with AllowedIPs route verification
- Allow non-root build-only tests with automatic runtime skip
This commit is contained in:
2026-05-16 21:21:56 +02:00
parent bc8059b453
commit 98d4edad14
2 changed files with 83 additions and 6 deletions

View File

@@ -137,9 +137,21 @@ start_vpn() {
ip route add "$VPN_ENDPOINT/32" via "$DEFAULT_GW" dev "$DEFAULT_IF" 2>/dev/null || true ip route add "$VPN_ENDPOINT/32" via "$DEFAULT_GW" dev "$DEFAULT_IF" 2>/dev/null || true
fi fi
# Route all traffic through the WireGuard tunnel # Parse AllowedIPs from config and add routes dynamically
ip route add 0.0.0.0/1 dev "$INTERFACE" ALLOWED_IPS=$(grep -i '^AllowedIPs' "$CONFIG_FILE" | head -1 | sed 's/.*= *//;s/ //g')
ip route add 128.0.0.0/1 dev "$INTERFACE"
if [ -n "$ALLOWED_IPS" ]; then
for ip in $(echo "$ALLOWED_IPS" | tr ',' ' '); do
if [ "$ip" = "0.0.0.0/0" ]; then
# Use the split route trick to avoid overriding the default route
# (which would break the endpoint connection)
ip route add 0.0.0.0/1 dev "$INTERFACE" 2>/dev/null || true
ip route add 128.0.0.0/1 dev "$INTERFACE" 2>/dev/null || true
else
ip route add "$ip" dev "$INTERFACE" 2>/dev/null || true
fi
done
fi
# ── Policy routing: ensure responses to incoming LAN traffic go back via eth0 ── # ── Policy routing: ensure responses to incoming LAN traffic go back via eth0 ──
if [ -n "$DEFAULT_GW" ] && [ -n "$DEFAULT_IF" ]; then if [ -n "$DEFAULT_GW" ] && [ -n "$DEFAULT_IF" ]; then
@@ -170,6 +182,25 @@ start_vpn() {
# ────────────────────────────────────────────── # ──────────────────────────────────────────────
stop_vpn() { stop_vpn() {
echo "[vpn] Stopping WireGuard interface ${INTERFACE}..." echo "[vpn] Stopping WireGuard interface ${INTERFACE}..."
# Remove routes added for AllowedIPs
ALLOWED_IPS=$(grep -i '^AllowedIPs' "$CONFIG_FILE" | head -1 | sed 's/.*= *//;s/ //g')
if [ -n "$ALLOWED_IPS" ]; then
for ip in $(echo "$ALLOWED_IPS" | tr ',' ' '); do
if [ "$ip" = "0.0.0.0/0" ]; then
ip route del 0.0.0.0/1 dev "$INTERFACE" 2>/dev/null || true
ip route del 128.0.0.0/1 dev "$INTERFACE" 2>/dev/null || true
else
ip route del "$ip" dev "$INTERFACE" 2>/dev/null || true
fi
done
fi
# Remove endpoint route
if [ -n "$VPN_ENDPOINT" ]; then
ip route del "$VPN_ENDPOINT/32" 2>/dev/null || true
fi
ip link del "$INTERFACE" 2>/dev/null || true ip link del "$INTERFACE" 2>/dev/null || true
} }

View File

@@ -6,23 +6,29 @@ Verifies:
2. The container starts and becomes healthy. 2. The container starts and becomes healthy.
3. The public IP inside the VPN differs from the host IP. 3. The public IP inside the VPN differs from the host IP.
4. Kill switch blocks traffic when WireGuard is down. 4. Kill switch blocks traffic when WireGuard is down.
5. AllowedIPs routes are set dynamically from the config.
Requirements: Requirements:
- podman installed - podman installed
- Root/sudo (NET_ADMIN capability) - Root/sudo (NET_ADMIN capability) for container runtime tests
- A valid WireGuard config at ./wg0.conf (or ./nl.conf) - A valid WireGuard config at ./wg0.conf (or ./nl.conf)
Usage: Usage:
# Build-only test (no sudo needed):
python3 -m pytest test_vpn.py::TestVPNImage::test_00_build_image -v
# Full integration test (requires sudo):
sudo python3 -m pytest test_vpn.py -v sudo python3 -m pytest test_vpn.py -v
# or # or
sudo python3 test_vpn.py sudo python3 test_vpn.py
""" """
import logging import logging
import os
import subprocess import subprocess
import sys
import time import time
import unittest import unittest
import os
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -35,6 +41,11 @@ STARTUP_TIMEOUT = 30 # seconds to wait for VPN to come up
HEALTH_POLL_INTERVAL = 2 # seconds between health checks HEALTH_POLL_INTERVAL = 2 # seconds between health checks
def is_root() -> bool:
"""Check if running as root."""
return os.geteuid() == 0
def run(cmd: list[str], timeout: int = 30, check: bool = True) -> subprocess.CompletedProcess: def run(cmd: list[str], timeout: int = 30, check: bool = True) -> subprocess.CompletedProcess:
"""Run a command and return the result.""" """Run a command and return the result."""
return subprocess.run(cmd, capture_output=True, text=True, timeout=timeout, check=check) return subprocess.run(cmd, capture_output=True, text=True, timeout=timeout, check=check)
@@ -55,6 +66,7 @@ class TestVPNImage(unittest.TestCase):
"""Test suite for the WireGuard VPN container.""" """Test suite for the WireGuard VPN container."""
host_ip: str = "" host_ip: str = ""
container_id: str = ""
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
@@ -84,6 +96,12 @@ class TestVPNImage(unittest.TestCase):
assert result.returncode == 0, f"Build failed:\n{result.stderr}" assert result.returncode == 0, f"Build failed:\n{result.stderr}"
logger.info("Image built successfully.") logger.info("Image built successfully.")
# Skip container runtime tests if not root
if not is_root():
logger.warning("Not running as root — skipping container runtime tests.")
cls.container_id = ""
return
# ── 3. Start the container ── # ── 3. Start the container ──
logger.info("Starting container '%s'...", CONTAINER_NAME) logger.info("Starting container '%s'...", CONTAINER_NAME)
result = run( result = run(
@@ -120,6 +138,8 @@ class TestVPNImage(unittest.TestCase):
@classmethod @classmethod
def tearDownClass(cls): def tearDownClass(cls):
"""Stop and remove the container.""" """Stop and remove the container."""
if not is_root():
return
logger.info("Cleaning up test container...") logger.info("Cleaning up test container...")
subprocess.run(["podman", "rm", "-f", CONTAINER_NAME], capture_output=True, check=False) subprocess.run(["podman", "rm", "-f", CONTAINER_NAME], capture_output=True, check=False)
logger.info("Cleanup complete.") logger.info("Cleanup complete.")
@@ -144,10 +164,22 @@ class TestVPNImage(unittest.TestCase):
) )
return result.stdout.strip() return result.stdout.strip()
def _skip_if_not_root(self):
"""Skip test if not running as root."""
if not is_root():
self.skipTest("This test requires root/sudo privileges")
# ── Tests ──────────────────────────────────────────────── # ── Tests ────────────────────────────────────────────────
def test_00_build_image(self):
"""The image builds successfully."""
# This is already verified in setUpClass, just confirm here
result = run(["podman", "images", "--format", "{{.Repository}}:{{.Tag}}"])
self.assertIn(IMAGE_NAME, result.stdout, "Image was not built")
def test_01_ip_differs_from_host(self): def test_01_ip_differs_from_host(self):
"""Public IP inside VPN is different from host IP.""" """Public IP inside VPN is different from host IP."""
self._skip_if_not_root()
vpn_ip = self._get_vpn_ip() vpn_ip = self._get_vpn_ip()
logger.info("VPN public IP: %s", vpn_ip) logger.info("VPN public IP: %s", vpn_ip)
logger.info("Host public IP: %s", self.host_ip) logger.info("Host public IP: %s", self.host_ip)
@@ -161,12 +193,26 @@ class TestVPNImage(unittest.TestCase):
def test_02_wireguard_interface_exists(self): def test_02_wireguard_interface_exists(self):
"""The wg0 interface is present in the container.""" """The wg0 interface is present in the container."""
self._skip_if_not_root()
result = podman_exec(CONTAINER_NAME, ["wg", "show", "wg0"]) result = podman_exec(CONTAINER_NAME, ["wg", "show", "wg0"])
self.assertEqual(result.returncode, 0, f"wg show failed:\n{result.stderr}") self.assertEqual(result.returncode, 0, f"wg show failed:\n{result.stderr}")
self.assertIn("peer", result.stdout.lower(), "No peer information in wg show output") self.assertIn("peer", result.stdout.lower(), "No peer information in wg show output")
def test_03_kill_switch_blocks_traffic(self): def test_03_allowedips_routes_set(self):
"""Routes are set dynamically based on AllowedIPs from config."""
self._skip_if_not_root()
# Check that routes exist for the AllowedIPs
result = podman_exec(CONTAINER_NAME, ["ip", "route", "show", "dev", "wg0"])
self.assertEqual(result.returncode, 0, f"ip route show failed:\n{result.stderr}")
# The config has AllowedIPs = 0.0.0.0/0, which should result in:
# 0.0.0.0/1 dev wg0 and 128.0.0.0/1 dev wg0
self.assertIn("0.0.0.0/1", result.stdout, "Route 0.0.0.0/1 not found")
self.assertIn("128.0.0.0/1", result.stdout, "Route 128.0.0.0/1 not found")
logger.info("AllowedIPs routes verified: %s", result.stdout.strip())
def test_04_kill_switch_blocks_traffic(self):
"""When WireGuard is down, traffic is blocked (kill switch).""" """When WireGuard is down, traffic is blocked (kill switch)."""
self._skip_if_not_root()
# Bring down the WireGuard interface by deleting it # Bring down the WireGuard interface by deleting it
down_result = podman_exec(CONTAINER_NAME, ["ip", "link", "del", "wg0"], timeout=10) down_result = podman_exec(CONTAINER_NAME, ["ip", "link", "del", "wg0"], timeout=10)
self.assertEqual(down_result.returncode, 0, f"ip link del wg0 failed:\n{down_result.stderr}") self.assertEqual(down_result.returncode, 0, f"ip link del wg0 failed:\n{down_result.stderr}")