diff --git a/Docker/entrypoint.sh b/Docker/entrypoint.sh index f3fa411..ac3887c 100644 --- a/Docker/entrypoint.sh +++ b/Docker/entrypoint.sh @@ -137,9 +137,21 @@ start_vpn() { ip route add "$VPN_ENDPOINT/32" via "$DEFAULT_GW" dev "$DEFAULT_IF" 2>/dev/null || true fi - # Route all traffic through the WireGuard tunnel - ip route add 0.0.0.0/1 dev "$INTERFACE" - ip route add 128.0.0.0/1 dev "$INTERFACE" + # Parse AllowedIPs from config and add routes dynamically + ALLOWED_IPS=$(grep -i '^AllowedIPs' "$CONFIG_FILE" | head -1 | sed 's/.*= *//;s/ //g') + + if [ -n "$ALLOWED_IPS" ]; then + for ip in $(echo "$ALLOWED_IPS" | tr ',' ' '); do + if [ "$ip" = "0.0.0.0/0" ]; then + # Use the split route trick to avoid overriding the default route + # (which would break the endpoint connection) + ip route add 0.0.0.0/1 dev "$INTERFACE" 2>/dev/null || true + ip route add 128.0.0.0/1 dev "$INTERFACE" 2>/dev/null || true + else + ip route add "$ip" dev "$INTERFACE" 2>/dev/null || true + fi + done + fi # ── Policy routing: ensure responses to incoming LAN traffic go back via eth0 ── if [ -n "$DEFAULT_GW" ] && [ -n "$DEFAULT_IF" ]; then @@ -170,6 +182,25 @@ start_vpn() { # ────────────────────────────────────────────── stop_vpn() { echo "[vpn] Stopping WireGuard interface ${INTERFACE}..." + + # Remove routes added for AllowedIPs + ALLOWED_IPS=$(grep -i '^AllowedIPs' "$CONFIG_FILE" | head -1 | sed 's/.*= *//;s/ //g') + if [ -n "$ALLOWED_IPS" ]; then + for ip in $(echo "$ALLOWED_IPS" | tr ',' ' '); do + if [ "$ip" = "0.0.0.0/0" ]; then + ip route del 0.0.0.0/1 dev "$INTERFACE" 2>/dev/null || true + ip route del 128.0.0.0/1 dev "$INTERFACE" 2>/dev/null || true + else + ip route del "$ip" dev "$INTERFACE" 2>/dev/null || true + fi + done + fi + + # Remove endpoint route + if [ -n "$VPN_ENDPOINT" ]; then + ip route del "$VPN_ENDPOINT/32" 2>/dev/null || true + fi + ip link del "$INTERFACE" 2>/dev/null || true } diff --git a/Docker/test_vpn.py b/Docker/test_vpn.py index 91e4511..68265e3 100644 --- a/Docker/test_vpn.py +++ b/Docker/test_vpn.py @@ -6,23 +6,29 @@ Verifies: 2. The container starts and becomes healthy. 3. The public IP inside the VPN differs from the host IP. 4. Kill switch blocks traffic when WireGuard is down. +5. AllowedIPs routes are set dynamically from the config. Requirements: - podman installed - - Root/sudo (NET_ADMIN capability) + - Root/sudo (NET_ADMIN capability) for container runtime tests - A valid WireGuard config at ./wg0.conf (or ./nl.conf) Usage: + # Build-only test (no sudo needed): + python3 -m pytest test_vpn.py::TestVPNImage::test_00_build_image -v + + # Full integration test (requires sudo): sudo python3 -m pytest test_vpn.py -v # or sudo python3 test_vpn.py """ import logging +import os import subprocess +import sys import time import unittest -import os logger = logging.getLogger(__name__) @@ -35,6 +41,11 @@ STARTUP_TIMEOUT = 30 # seconds to wait for VPN to come up HEALTH_POLL_INTERVAL = 2 # seconds between health checks +def is_root() -> bool: + """Check if running as root.""" + return os.geteuid() == 0 + + def run(cmd: list[str], timeout: int = 30, check: bool = True) -> subprocess.CompletedProcess: """Run a command and return the result.""" return subprocess.run(cmd, capture_output=True, text=True, timeout=timeout, check=check) @@ -55,6 +66,7 @@ class TestVPNImage(unittest.TestCase): """Test suite for the WireGuard VPN container.""" host_ip: str = "" + container_id: str = "" @classmethod def setUpClass(cls): @@ -84,6 +96,12 @@ class TestVPNImage(unittest.TestCase): assert result.returncode == 0, f"Build failed:\n{result.stderr}" logger.info("Image built successfully.") + # Skip container runtime tests if not root + if not is_root(): + logger.warning("Not running as root — skipping container runtime tests.") + cls.container_id = "" + return + # ── 3. Start the container ── logger.info("Starting container '%s'...", CONTAINER_NAME) result = run( @@ -120,6 +138,8 @@ class TestVPNImage(unittest.TestCase): @classmethod def tearDownClass(cls): """Stop and remove the container.""" + if not is_root(): + return logger.info("Cleaning up test container...") subprocess.run(["podman", "rm", "-f", CONTAINER_NAME], capture_output=True, check=False) logger.info("Cleanup complete.") @@ -144,10 +164,22 @@ class TestVPNImage(unittest.TestCase): ) return result.stdout.strip() + def _skip_if_not_root(self): + """Skip test if not running as root.""" + if not is_root(): + self.skipTest("This test requires root/sudo privileges") + # ── Tests ──────────────────────────────────────────────── + def test_00_build_image(self): + """The image builds successfully.""" + # This is already verified in setUpClass, just confirm here + result = run(["podman", "images", "--format", "{{.Repository}}:{{.Tag}}"]) + self.assertIn(IMAGE_NAME, result.stdout, "Image was not built") + def test_01_ip_differs_from_host(self): """Public IP inside VPN is different from host IP.""" + self._skip_if_not_root() vpn_ip = self._get_vpn_ip() logger.info("VPN public IP: %s", vpn_ip) logger.info("Host public IP: %s", self.host_ip) @@ -161,12 +193,26 @@ class TestVPNImage(unittest.TestCase): def test_02_wireguard_interface_exists(self): """The wg0 interface is present in the container.""" + self._skip_if_not_root() result = podman_exec(CONTAINER_NAME, ["wg", "show", "wg0"]) self.assertEqual(result.returncode, 0, f"wg show failed:\n{result.stderr}") self.assertIn("peer", result.stdout.lower(), "No peer information in wg show output") - def test_03_kill_switch_blocks_traffic(self): + def test_03_allowedips_routes_set(self): + """Routes are set dynamically based on AllowedIPs from config.""" + self._skip_if_not_root() + # Check that routes exist for the AllowedIPs + result = podman_exec(CONTAINER_NAME, ["ip", "route", "show", "dev", "wg0"]) + self.assertEqual(result.returncode, 0, f"ip route show failed:\n{result.stderr}") + # The config has AllowedIPs = 0.0.0.0/0, which should result in: + # 0.0.0.0/1 dev wg0 and 128.0.0.0/1 dev wg0 + self.assertIn("0.0.0.0/1", result.stdout, "Route 0.0.0.0/1 not found") + self.assertIn("128.0.0.0/1", result.stdout, "Route 128.0.0.0/1 not found") + logger.info("AllowedIPs routes verified: %s", result.stdout.strip()) + + def test_04_kill_switch_blocks_traffic(self): """When WireGuard is down, traffic is blocked (kill switch).""" + self._skip_if_not_root() # Bring down the WireGuard interface by deleting it down_result = podman_exec(CONTAINER_NAME, ["ip", "link", "del", "wg0"], timeout=10) self.assertEqual(down_result.returncode, 0, f"ip link del wg0 failed:\n{down_result.stderr}")