- Parse AllowedIPs dynamically from WireGuard config instead of hardcoding routes - Remove auto-created default route by wg setconf to prevent breaking endpoint connection - Fix DNS parsing: write comma-separated DNS servers as separate nameserver lines - Add test for AllowedIPs route verification and DNS configuration - Update test to skip container runtime tests when not running as root
254 lines
10 KiB
Python
254 lines
10 KiB
Python
"""
|
|
Integration test for the WireGuard VPN Podman image.
|
|
|
|
Verifies:
|
|
1. The image builds successfully.
|
|
2. The container starts and becomes healthy.
|
|
3. The public IP inside the VPN differs from the host IP.
|
|
4. Kill switch blocks traffic when WireGuard is down.
|
|
5. AllowedIPs routes are set dynamically from the config.
|
|
|
|
Requirements:
|
|
- podman installed
|
|
- Root/sudo (NET_ADMIN capability) for container runtime tests
|
|
- A valid WireGuard config at ./wg0.conf (or ./nl.conf)
|
|
|
|
Usage:
|
|
# Build-only test (no sudo needed):
|
|
python3 -m pytest test_vpn.py::TestVPNImage::test_00_build_image -v
|
|
|
|
# Full integration test (requires sudo):
|
|
sudo python3 -m pytest test_vpn.py -v
|
|
# or
|
|
sudo python3 test_vpn.py
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import unittest
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
IMAGE_NAME = "vpn-wireguard-test"
|
|
CONTAINER_NAME = "vpn-test-container"
|
|
CONFIG_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "wg0.conf")
|
|
BUILD_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
IP_CHECK_URL = "https://ifconfig.me"
|
|
STARTUP_TIMEOUT = 30 # seconds to wait for VPN to come up
|
|
HEALTH_POLL_INTERVAL = 2 # seconds between health checks
|
|
|
|
|
|
def is_root() -> bool:
|
|
"""Check if running as root."""
|
|
return os.geteuid() == 0
|
|
|
|
|
|
def run(cmd: list[str], timeout: int = 30, check: bool = True) -> subprocess.CompletedProcess:
|
|
"""Run a command and return the result."""
|
|
return subprocess.run(cmd, capture_output=True, text=True, timeout=timeout, check=check)
|
|
|
|
|
|
def get_host_ip() -> str:
|
|
"""Get the public IP of the host machine."""
|
|
result = run(["curl", "-s", "--max-time", "10", IP_CHECK_URL])
|
|
return result.stdout.strip()
|
|
|
|
|
|
def podman_exec(container: str, cmd: list[str], timeout: int = 15) -> subprocess.CompletedProcess:
|
|
"""Execute a command inside a running container."""
|
|
return run(["podman", "exec", container] + cmd, timeout=timeout, check=False)
|
|
|
|
|
|
class TestVPNImage(unittest.TestCase):
|
|
"""Test suite for the WireGuard VPN container."""
|
|
|
|
host_ip: str = ""
|
|
container_id: str = ""
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
"""Build image, get host IP, start container, wait for VPN."""
|
|
# Clean up any leftover container from a previous run
|
|
subprocess.run(
|
|
["podman", "rm", "-f", CONTAINER_NAME],
|
|
capture_output=True, check=False,
|
|
)
|
|
|
|
# ── 1. Get host public IP before VPN ──
|
|
logger.info("Fetching host public IP...")
|
|
cls.host_ip = get_host_ip()
|
|
logger.info("Host public IP: %s", cls.host_ip)
|
|
assert cls.host_ip, "Could not determine host public IP"
|
|
|
|
# ── 2. Build the image ──
|
|
logger.info("Building image '%s'...", IMAGE_NAME)
|
|
result = run(
|
|
["podman", "build", "-t", IMAGE_NAME, BUILD_DIR],
|
|
timeout=180,
|
|
)
|
|
logger.debug(
|
|
"Build output: %s",
|
|
result.stdout[-500:] if len(result.stdout) > 500 else result.stdout,
|
|
)
|
|
assert result.returncode == 0, f"Build failed:\n{result.stderr}"
|
|
logger.info("Image built successfully.")
|
|
|
|
# Skip container runtime tests if not root
|
|
if not is_root():
|
|
logger.warning("Not running as root — skipping container runtime tests.")
|
|
cls.container_id = ""
|
|
return
|
|
|
|
# ── 3. Start the container ──
|
|
logger.info("Starting container '%s'...", CONTAINER_NAME)
|
|
result = run(
|
|
[
|
|
"podman", "run", "-d",
|
|
"--name", CONTAINER_NAME,
|
|
"--cap-add=NET_ADMIN",
|
|
"--cap-add=SYS_MODULE",
|
|
"--sysctl", "net.ipv4.ip_forward=1",
|
|
"-v", f"{CONFIG_FILE}:/etc/wireguard/wg0.conf:ro",
|
|
"-v", "/lib/modules:/lib/modules:ro",
|
|
IMAGE_NAME,
|
|
],
|
|
timeout=30,
|
|
check=False,
|
|
)
|
|
assert result.returncode == 0, f"Container failed to start:\n{result.stderr}"
|
|
cls.container_id = result.stdout.strip()
|
|
logger.info("Container started: %s", cls.container_id[:12])
|
|
|
|
# Verify it's running
|
|
inspect = run(
|
|
["podman", "inspect", "-f", "{{.State.Running}}", CONTAINER_NAME],
|
|
check=False,
|
|
)
|
|
assert inspect.stdout.strip() == "true", "Container is not running"
|
|
|
|
# ── 4. Wait for VPN to come up ──
|
|
logger.info("Waiting up to %d seconds for VPN tunnel...", STARTUP_TIMEOUT)
|
|
vpn_up = cls._wait_for_vpn_cls(STARTUP_TIMEOUT)
|
|
assert vpn_up, f"VPN did not come up within {STARTUP_TIMEOUT}s"
|
|
logger.info("VPN tunnel is up. Running tests.")
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
"""Stop and remove the container."""
|
|
if not is_root():
|
|
return
|
|
logger.info("Cleaning up test container...")
|
|
subprocess.run(["podman", "rm", "-f", CONTAINER_NAME], capture_output=True, check=False)
|
|
logger.info("Cleanup complete.")
|
|
|
|
@classmethod
|
|
def _wait_for_vpn_cls(cls, timeout: int = STARTUP_TIMEOUT) -> bool:
|
|
"""Wait until the VPN tunnel is up (can reach the internet)."""
|
|
deadline = time.time() + timeout
|
|
while time.time() < deadline:
|
|
result = podman_exec(CONTAINER_NAME, ["ping", "-c", "1", "-W", "3", "1.1.1.1"])
|
|
if result.returncode == 0:
|
|
return True
|
|
time.sleep(HEALTH_POLL_INTERVAL)
|
|
return False
|
|
|
|
def _get_vpn_ip(self) -> str:
|
|
"""Get the public IP as seen from inside the container."""
|
|
result = podman_exec(
|
|
CONTAINER_NAME,
|
|
["curl", "-s", "--max-time", "10", IP_CHECK_URL],
|
|
timeout=20,
|
|
)
|
|
return result.stdout.strip()
|
|
|
|
def _skip_if_not_root(self):
|
|
"""Skip test if not running as root."""
|
|
if not is_root():
|
|
self.skipTest("This test requires root/sudo privileges")
|
|
|
|
# ── Tests ────────────────────────────────────────────────
|
|
|
|
def test_00_build_image(self):
|
|
"""The image builds successfully."""
|
|
# This is already verified in setUpClass, just confirm here
|
|
result = run(["podman", "images", "--format", "{{.Repository}}:{{.Tag}}"])
|
|
self.assertIn(IMAGE_NAME, result.stdout, "Image was not built")
|
|
|
|
def test_01_ip_differs_from_host(self):
|
|
"""Public IP inside VPN is different from host IP."""
|
|
self._skip_if_not_root()
|
|
vpn_ip = self._get_vpn_ip()
|
|
logger.info("VPN public IP: %s", vpn_ip)
|
|
logger.info("Host public IP: %s", self.host_ip)
|
|
|
|
self.assertTrue(vpn_ip, "Could not fetch IP from inside the container")
|
|
self.assertNotEqual(
|
|
vpn_ip,
|
|
self.host_ip,
|
|
f"VPN IP ({vpn_ip}) is the same as host IP — VPN is not working!",
|
|
)
|
|
|
|
def test_02_wireguard_interface_exists(self):
|
|
"""The wg0 interface is present in the container."""
|
|
self._skip_if_not_root()
|
|
result = podman_exec(CONTAINER_NAME, ["wg", "show", "wg0"])
|
|
self.assertEqual(result.returncode, 0, f"wg show failed:\n{result.stderr}")
|
|
self.assertIn("peer", result.stdout.lower(), "No peer information in wg show output")
|
|
# AllowedIPs should be present in wg show output
|
|
self.assertIn("allowed ips", result.stdout.lower(), "AllowedIPs not found in wg show output")
|
|
|
|
def test_03_allowedips_routes_set(self):
|
|
"""Routes are set dynamically based on AllowedIPs from config."""
|
|
self._skip_if_not_root()
|
|
# Check that routes exist for the AllowedIPs
|
|
result = podman_exec(CONTAINER_NAME, ["ip", "route", "show", "dev", "wg0"])
|
|
self.assertEqual(result.returncode, 0, f"ip route show failed:\n{result.stderr}")
|
|
# The config has AllowedIPs = 0.0.0.0/0, which should result in:
|
|
# 0.0.0.0/1 dev wg0 and 128.0.0.0/1 dev wg0
|
|
self.assertIn("0.0.0.0/1", result.stdout, "Route 0.0.0.0/1 not found")
|
|
self.assertIn("128.0.0.0/1", result.stdout, "Route 128.0.0.0/1 not found")
|
|
# Make sure there is NO default route through wg0 (Table = off should prevent this)
|
|
self.assertNotIn("default dev wg0", result.stdout, "Default route through wg0 found — Table = off not working!")
|
|
logger.info("AllowedIPs routes verified: %s", result.stdout.strip())
|
|
|
|
def test_03b_dns_configured(self):
|
|
"""DNS is configured correctly with multiple nameserver lines."""
|
|
self._skip_if_not_root()
|
|
result = podman_exec(CONTAINER_NAME, ["cat", "/etc/resolv.conf"])
|
|
self.assertEqual(result.returncode, 0, f"cat /etc/resolv.conf failed:\n{result.stderr}")
|
|
# Should have two separate nameserver lines, not one with commas
|
|
self.assertIn("nameserver 198.18.0.1", result.stdout, "DNS 198.18.0.1 not found")
|
|
self.assertIn("nameserver 198.18.0.2", result.stdout, "DNS 198.18.0.2 not found")
|
|
# Make sure there are no commas in nameserver lines
|
|
self.assertNotIn("nameserver 198.18.0.1,198.18.0.2", result.stdout, "DNS servers written on one line with comma!")
|
|
logger.info("DNS config verified: %s", result.stdout.strip())
|
|
|
|
def test_04_kill_switch_blocks_traffic(self):
|
|
"""When WireGuard is down, traffic is blocked (kill switch)."""
|
|
self._skip_if_not_root()
|
|
# Bring down the WireGuard interface by deleting it
|
|
down_result = podman_exec(CONTAINER_NAME, ["ip", "link", "del", "wg0"], timeout=10)
|
|
self.assertEqual(down_result.returncode, 0, f"ip link del wg0 failed:\n{down_result.stderr}")
|
|
|
|
# Give iptables a moment
|
|
time.sleep(2)
|
|
|
|
# Try to reach the internet — should fail due to kill switch
|
|
result = podman_exec(
|
|
CONTAINER_NAME,
|
|
["curl", "-s", "--max-time", "5", IP_CHECK_URL],
|
|
timeout=10,
|
|
)
|
|
self.assertNotEqual(
|
|
result.returncode, 0,
|
|
"Traffic went through even with WireGuard down — kill switch is NOT working!",
|
|
)
|
|
logger.info("Kill switch confirmed: traffic blocked with VPN down")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main(verbosity=2)
|