Files
Aniworld/Docker/test_vpn.py
Lukas 50a7083ce5 fix(vpn): support AllowedIPs=0.0.0.0/0 and multi-DNS configs
- Parse AllowedIPs dynamically from WireGuard config instead of hardcoding routes
- Remove auto-created default route by wg setconf to prevent breaking endpoint connection
- Fix DNS parsing: write comma-separated DNS servers as separate nameserver lines
- Add test for AllowedIPs route verification and DNS configuration
- Update test to skip container runtime tests when not running as root
2026-05-16 21:41:27 +02:00

254 lines
10 KiB
Python

"""
Integration test for the WireGuard VPN Podman image.
Verifies:
1. The image builds successfully.
2. The container starts and becomes healthy.
3. The public IP inside the VPN differs from the host IP.
4. Kill switch blocks traffic when WireGuard is down.
5. AllowedIPs routes are set dynamically from the config.
Requirements:
- podman installed
- Root/sudo (NET_ADMIN capability) for container runtime tests
- A valid WireGuard config at ./wg0.conf (or ./nl.conf)
Usage:
# Build-only test (no sudo needed):
python3 -m pytest test_vpn.py::TestVPNImage::test_00_build_image -v
# Full integration test (requires sudo):
sudo python3 -m pytest test_vpn.py -v
# or
sudo python3 test_vpn.py
"""
import logging
import os
import subprocess
import sys
import time
import unittest
logger = logging.getLogger(__name__)
IMAGE_NAME = "vpn-wireguard-test"
CONTAINER_NAME = "vpn-test-container"
CONFIG_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "wg0.conf")
BUILD_DIR = os.path.dirname(os.path.abspath(__file__))
IP_CHECK_URL = "https://ifconfig.me"
STARTUP_TIMEOUT = 30 # seconds to wait for VPN to come up
HEALTH_POLL_INTERVAL = 2 # seconds between health checks
def is_root() -> bool:
"""Check if running as root."""
return os.geteuid() == 0
def run(cmd: list[str], timeout: int = 30, check: bool = True) -> subprocess.CompletedProcess:
"""Run a command and return the result."""
return subprocess.run(cmd, capture_output=True, text=True, timeout=timeout, check=check)
def get_host_ip() -> str:
"""Get the public IP of the host machine."""
result = run(["curl", "-s", "--max-time", "10", IP_CHECK_URL])
return result.stdout.strip()
def podman_exec(container: str, cmd: list[str], timeout: int = 15) -> subprocess.CompletedProcess:
"""Execute a command inside a running container."""
return run(["podman", "exec", container] + cmd, timeout=timeout, check=False)
class TestVPNImage(unittest.TestCase):
"""Test suite for the WireGuard VPN container."""
host_ip: str = ""
container_id: str = ""
@classmethod
def setUpClass(cls):
"""Build image, get host IP, start container, wait for VPN."""
# Clean up any leftover container from a previous run
subprocess.run(
["podman", "rm", "-f", CONTAINER_NAME],
capture_output=True, check=False,
)
# ── 1. Get host public IP before VPN ──
logger.info("Fetching host public IP...")
cls.host_ip = get_host_ip()
logger.info("Host public IP: %s", cls.host_ip)
assert cls.host_ip, "Could not determine host public IP"
# ── 2. Build the image ──
logger.info("Building image '%s'...", IMAGE_NAME)
result = run(
["podman", "build", "-t", IMAGE_NAME, BUILD_DIR],
timeout=180,
)
logger.debug(
"Build output: %s",
result.stdout[-500:] if len(result.stdout) > 500 else result.stdout,
)
assert result.returncode == 0, f"Build failed:\n{result.stderr}"
logger.info("Image built successfully.")
# Skip container runtime tests if not root
if not is_root():
logger.warning("Not running as root — skipping container runtime tests.")
cls.container_id = ""
return
# ── 3. Start the container ──
logger.info("Starting container '%s'...", CONTAINER_NAME)
result = run(
[
"podman", "run", "-d",
"--name", CONTAINER_NAME,
"--cap-add=NET_ADMIN",
"--cap-add=SYS_MODULE",
"--sysctl", "net.ipv4.ip_forward=1",
"-v", f"{CONFIG_FILE}:/etc/wireguard/wg0.conf:ro",
"-v", "/lib/modules:/lib/modules:ro",
IMAGE_NAME,
],
timeout=30,
check=False,
)
assert result.returncode == 0, f"Container failed to start:\n{result.stderr}"
cls.container_id = result.stdout.strip()
logger.info("Container started: %s", cls.container_id[:12])
# Verify it's running
inspect = run(
["podman", "inspect", "-f", "{{.State.Running}}", CONTAINER_NAME],
check=False,
)
assert inspect.stdout.strip() == "true", "Container is not running"
# ── 4. Wait for VPN to come up ──
logger.info("Waiting up to %d seconds for VPN tunnel...", STARTUP_TIMEOUT)
vpn_up = cls._wait_for_vpn_cls(STARTUP_TIMEOUT)
assert vpn_up, f"VPN did not come up within {STARTUP_TIMEOUT}s"
logger.info("VPN tunnel is up. Running tests.")
@classmethod
def tearDownClass(cls):
"""Stop and remove the container."""
if not is_root():
return
logger.info("Cleaning up test container...")
subprocess.run(["podman", "rm", "-f", CONTAINER_NAME], capture_output=True, check=False)
logger.info("Cleanup complete.")
@classmethod
def _wait_for_vpn_cls(cls, timeout: int = STARTUP_TIMEOUT) -> bool:
"""Wait until the VPN tunnel is up (can reach the internet)."""
deadline = time.time() + timeout
while time.time() < deadline:
result = podman_exec(CONTAINER_NAME, ["ping", "-c", "1", "-W", "3", "1.1.1.1"])
if result.returncode == 0:
return True
time.sleep(HEALTH_POLL_INTERVAL)
return False
def _get_vpn_ip(self) -> str:
"""Get the public IP as seen from inside the container."""
result = podman_exec(
CONTAINER_NAME,
["curl", "-s", "--max-time", "10", IP_CHECK_URL],
timeout=20,
)
return result.stdout.strip()
def _skip_if_not_root(self):
"""Skip test if not running as root."""
if not is_root():
self.skipTest("This test requires root/sudo privileges")
# ── Tests ────────────────────────────────────────────────
def test_00_build_image(self):
"""The image builds successfully."""
# This is already verified in setUpClass, just confirm here
result = run(["podman", "images", "--format", "{{.Repository}}:{{.Tag}}"])
self.assertIn(IMAGE_NAME, result.stdout, "Image was not built")
def test_01_ip_differs_from_host(self):
"""Public IP inside VPN is different from host IP."""
self._skip_if_not_root()
vpn_ip = self._get_vpn_ip()
logger.info("VPN public IP: %s", vpn_ip)
logger.info("Host public IP: %s", self.host_ip)
self.assertTrue(vpn_ip, "Could not fetch IP from inside the container")
self.assertNotEqual(
vpn_ip,
self.host_ip,
f"VPN IP ({vpn_ip}) is the same as host IP — VPN is not working!",
)
def test_02_wireguard_interface_exists(self):
"""The wg0 interface is present in the container."""
self._skip_if_not_root()
result = podman_exec(CONTAINER_NAME, ["wg", "show", "wg0"])
self.assertEqual(result.returncode, 0, f"wg show failed:\n{result.stderr}")
self.assertIn("peer", result.stdout.lower(), "No peer information in wg show output")
# AllowedIPs should be present in wg show output
self.assertIn("allowed ips", result.stdout.lower(), "AllowedIPs not found in wg show output")
def test_03_allowedips_routes_set(self):
"""Routes are set dynamically based on AllowedIPs from config."""
self._skip_if_not_root()
# Check that routes exist for the AllowedIPs
result = podman_exec(CONTAINER_NAME, ["ip", "route", "show", "dev", "wg0"])
self.assertEqual(result.returncode, 0, f"ip route show failed:\n{result.stderr}")
# The config has AllowedIPs = 0.0.0.0/0, which should result in:
# 0.0.0.0/1 dev wg0 and 128.0.0.0/1 dev wg0
self.assertIn("0.0.0.0/1", result.stdout, "Route 0.0.0.0/1 not found")
self.assertIn("128.0.0.0/1", result.stdout, "Route 128.0.0.0/1 not found")
# Make sure there is NO default route through wg0 (Table = off should prevent this)
self.assertNotIn("default dev wg0", result.stdout, "Default route through wg0 found — Table = off not working!")
logger.info("AllowedIPs routes verified: %s", result.stdout.strip())
def test_03b_dns_configured(self):
"""DNS is configured correctly with multiple nameserver lines."""
self._skip_if_not_root()
result = podman_exec(CONTAINER_NAME, ["cat", "/etc/resolv.conf"])
self.assertEqual(result.returncode, 0, f"cat /etc/resolv.conf failed:\n{result.stderr}")
# Should have two separate nameserver lines, not one with commas
self.assertIn("nameserver 198.18.0.1", result.stdout, "DNS 198.18.0.1 not found")
self.assertIn("nameserver 198.18.0.2", result.stdout, "DNS 198.18.0.2 not found")
# Make sure there are no commas in nameserver lines
self.assertNotIn("nameserver 198.18.0.1,198.18.0.2", result.stdout, "DNS servers written on one line with comma!")
logger.info("DNS config verified: %s", result.stdout.strip())
def test_04_kill_switch_blocks_traffic(self):
"""When WireGuard is down, traffic is blocked (kill switch)."""
self._skip_if_not_root()
# Bring down the WireGuard interface by deleting it
down_result = podman_exec(CONTAINER_NAME, ["ip", "link", "del", "wg0"], timeout=10)
self.assertEqual(down_result.returncode, 0, f"ip link del wg0 failed:\n{down_result.stderr}")
# Give iptables a moment
time.sleep(2)
# Try to reach the internet — should fail due to kill switch
result = podman_exec(
CONTAINER_NAME,
["curl", "-s", "--max-time", "5", IP_CHECK_URL],
timeout=10,
)
self.assertNotEqual(
result.returncode, 0,
"Traffic went through even with WireGuard down — kill switch is NOT working!",
)
logger.info("Kill switch confirmed: traffic blocked with VPN down")
if __name__ == "__main__":
unittest.main(verbosity=2)