Files
Aniworld/Docker/test_vpn.py
Lukas 98d4edad14 feat(vpn): dynamic AllowedIPs routing and improved test coverage
- Parse AllowedIPs from WireGuard config in entrypoint.sh
- Add/remove routes dynamically instead of hardcoded 0.0.0.0/1 split
- Handle both 0.0.0.0/0 and custom AllowedIPs
- Add route cleanup on VPN stop (endpoint + AllowedIPs)
- Update test_vpn.py with AllowedIPs route verification
- Allow non-root build-only tests with automatic runtime skip
2026-05-16 21:21:56 +02:00

238 lines
8.9 KiB
Python

"""
Integration test for the WireGuard VPN Podman image.
Verifies:
1. The image builds successfully.
2. The container starts and becomes healthy.
3. The public IP inside the VPN differs from the host IP.
4. Kill switch blocks traffic when WireGuard is down.
5. AllowedIPs routes are set dynamically from the config.
Requirements:
- podman installed
- Root/sudo (NET_ADMIN capability) for container runtime tests
- A valid WireGuard config at ./wg0.conf (or ./nl.conf)
Usage:
# Build-only test (no sudo needed):
python3 -m pytest test_vpn.py::TestVPNImage::test_00_build_image -v
# Full integration test (requires sudo):
sudo python3 -m pytest test_vpn.py -v
# or
sudo python3 test_vpn.py
"""
import logging
import os
import subprocess
import sys
import time
import unittest
logger = logging.getLogger(__name__)
IMAGE_NAME = "vpn-wireguard-test"
CONTAINER_NAME = "vpn-test-container"
CONFIG_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "wg0.conf")
BUILD_DIR = os.path.dirname(os.path.abspath(__file__))
IP_CHECK_URL = "https://ifconfig.me"
STARTUP_TIMEOUT = 30 # seconds to wait for VPN to come up
HEALTH_POLL_INTERVAL = 2 # seconds between health checks
def is_root() -> bool:
"""Check if running as root."""
return os.geteuid() == 0
def run(cmd: list[str], timeout: int = 30, check: bool = True) -> subprocess.CompletedProcess:
"""Run a command and return the result."""
return subprocess.run(cmd, capture_output=True, text=True, timeout=timeout, check=check)
def get_host_ip() -> str:
"""Get the public IP of the host machine."""
result = run(["curl", "-s", "--max-time", "10", IP_CHECK_URL])
return result.stdout.strip()
def podman_exec(container: str, cmd: list[str], timeout: int = 15) -> subprocess.CompletedProcess:
"""Execute a command inside a running container."""
return run(["podman", "exec", container] + cmd, timeout=timeout, check=False)
class TestVPNImage(unittest.TestCase):
"""Test suite for the WireGuard VPN container."""
host_ip: str = ""
container_id: str = ""
@classmethod
def setUpClass(cls):
"""Build image, get host IP, start container, wait for VPN."""
# Clean up any leftover container from a previous run
subprocess.run(
["podman", "rm", "-f", CONTAINER_NAME],
capture_output=True, check=False,
)
# ── 1. Get host public IP before VPN ──
logger.info("Fetching host public IP...")
cls.host_ip = get_host_ip()
logger.info("Host public IP: %s", cls.host_ip)
assert cls.host_ip, "Could not determine host public IP"
# ── 2. Build the image ──
logger.info("Building image '%s'...", IMAGE_NAME)
result = run(
["podman", "build", "-t", IMAGE_NAME, BUILD_DIR],
timeout=180,
)
logger.debug(
"Build output: %s",
result.stdout[-500:] if len(result.stdout) > 500 else result.stdout,
)
assert result.returncode == 0, f"Build failed:\n{result.stderr}"
logger.info("Image built successfully.")
# Skip container runtime tests if not root
if not is_root():
logger.warning("Not running as root — skipping container runtime tests.")
cls.container_id = ""
return
# ── 3. Start the container ──
logger.info("Starting container '%s'...", CONTAINER_NAME)
result = run(
[
"podman", "run", "-d",
"--name", CONTAINER_NAME,
"--cap-add=NET_ADMIN",
"--cap-add=SYS_MODULE",
"--sysctl", "net.ipv4.ip_forward=1",
"-v", f"{CONFIG_FILE}:/etc/wireguard/wg0.conf:ro",
"-v", "/lib/modules:/lib/modules:ro",
IMAGE_NAME,
],
timeout=30,
check=False,
)
assert result.returncode == 0, f"Container failed to start:\n{result.stderr}"
cls.container_id = result.stdout.strip()
logger.info("Container started: %s", cls.container_id[:12])
# Verify it's running
inspect = run(
["podman", "inspect", "-f", "{{.State.Running}}", CONTAINER_NAME],
check=False,
)
assert inspect.stdout.strip() == "true", "Container is not running"
# ── 4. Wait for VPN to come up ──
logger.info("Waiting up to %d seconds for VPN tunnel...", STARTUP_TIMEOUT)
vpn_up = cls._wait_for_vpn_cls(STARTUP_TIMEOUT)
assert vpn_up, f"VPN did not come up within {STARTUP_TIMEOUT}s"
logger.info("VPN tunnel is up. Running tests.")
@classmethod
def tearDownClass(cls):
"""Stop and remove the container."""
if not is_root():
return
logger.info("Cleaning up test container...")
subprocess.run(["podman", "rm", "-f", CONTAINER_NAME], capture_output=True, check=False)
logger.info("Cleanup complete.")
@classmethod
def _wait_for_vpn_cls(cls, timeout: int = STARTUP_TIMEOUT) -> bool:
"""Wait until the VPN tunnel is up (can reach the internet)."""
deadline = time.time() + timeout
while time.time() < deadline:
result = podman_exec(CONTAINER_NAME, ["ping", "-c", "1", "-W", "3", "1.1.1.1"])
if result.returncode == 0:
return True
time.sleep(HEALTH_POLL_INTERVAL)
return False
def _get_vpn_ip(self) -> str:
"""Get the public IP as seen from inside the container."""
result = podman_exec(
CONTAINER_NAME,
["curl", "-s", "--max-time", "10", IP_CHECK_URL],
timeout=20,
)
return result.stdout.strip()
def _skip_if_not_root(self):
"""Skip test if not running as root."""
if not is_root():
self.skipTest("This test requires root/sudo privileges")
# ── Tests ────────────────────────────────────────────────
def test_00_build_image(self):
"""The image builds successfully."""
# This is already verified in setUpClass, just confirm here
result = run(["podman", "images", "--format", "{{.Repository}}:{{.Tag}}"])
self.assertIn(IMAGE_NAME, result.stdout, "Image was not built")
def test_01_ip_differs_from_host(self):
"""Public IP inside VPN is different from host IP."""
self._skip_if_not_root()
vpn_ip = self._get_vpn_ip()
logger.info("VPN public IP: %s", vpn_ip)
logger.info("Host public IP: %s", self.host_ip)
self.assertTrue(vpn_ip, "Could not fetch IP from inside the container")
self.assertNotEqual(
vpn_ip,
self.host_ip,
f"VPN IP ({vpn_ip}) is the same as host IP — VPN is not working!",
)
def test_02_wireguard_interface_exists(self):
"""The wg0 interface is present in the container."""
self._skip_if_not_root()
result = podman_exec(CONTAINER_NAME, ["wg", "show", "wg0"])
self.assertEqual(result.returncode, 0, f"wg show failed:\n{result.stderr}")
self.assertIn("peer", result.stdout.lower(), "No peer information in wg show output")
def test_03_allowedips_routes_set(self):
"""Routes are set dynamically based on AllowedIPs from config."""
self._skip_if_not_root()
# Check that routes exist for the AllowedIPs
result = podman_exec(CONTAINER_NAME, ["ip", "route", "show", "dev", "wg0"])
self.assertEqual(result.returncode, 0, f"ip route show failed:\n{result.stderr}")
# The config has AllowedIPs = 0.0.0.0/0, which should result in:
# 0.0.0.0/1 dev wg0 and 128.0.0.0/1 dev wg0
self.assertIn("0.0.0.0/1", result.stdout, "Route 0.0.0.0/1 not found")
self.assertIn("128.0.0.0/1", result.stdout, "Route 128.0.0.0/1 not found")
logger.info("AllowedIPs routes verified: %s", result.stdout.strip())
def test_04_kill_switch_blocks_traffic(self):
"""When WireGuard is down, traffic is blocked (kill switch)."""
self._skip_if_not_root()
# Bring down the WireGuard interface by deleting it
down_result = podman_exec(CONTAINER_NAME, ["ip", "link", "del", "wg0"], timeout=10)
self.assertEqual(down_result.returncode, 0, f"ip link del wg0 failed:\n{down_result.stderr}")
# Give iptables a moment
time.sleep(2)
# Try to reach the internet — should fail due to kill switch
result = podman_exec(
CONTAINER_NAME,
["curl", "-s", "--max-time", "5", IP_CHECK_URL],
timeout=10,
)
self.assertNotEqual(
result.returncode, 0,
"Traffic went through even with WireGuard down — kill switch is NOT working!",
)
logger.info("Kill switch confirmed: traffic blocked with VPN down")
if __name__ == "__main__":
unittest.main(verbosity=2)