186 lines
6.9 KiB
Python
186 lines
6.9 KiB
Python
"""
|
|
Integration test for the WireGuard VPN Podman image.
|
|
|
|
Verifies:
|
|
1. The image builds successfully.
|
|
2. The container starts and becomes healthy.
|
|
3. The public IP inside the VPN differs from the host IP.
|
|
4. Kill switch blocks traffic when WireGuard is down.
|
|
|
|
Requirements:
|
|
- podman installed
|
|
- Root/sudo (NET_ADMIN capability)
|
|
- A valid WireGuard config at ./wg0.conf (or ./nl.conf)
|
|
|
|
Usage:
|
|
sudo python3 -m pytest test_vpn.py -v
|
|
# or
|
|
sudo python3 test_vpn.py
|
|
"""
|
|
|
|
import subprocess
|
|
import time
|
|
import unittest
|
|
import os
|
|
|
|
IMAGE_NAME = "vpn-wireguard-test"
|
|
CONTAINER_NAME = "vpn-test-container"
|
|
CONFIG_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "wg0.conf")
|
|
BUILD_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
IP_CHECK_URL = "https://ifconfig.me"
|
|
STARTUP_TIMEOUT = 30 # seconds to wait for VPN to come up
|
|
HEALTH_POLL_INTERVAL = 2 # seconds between health checks
|
|
|
|
|
|
def run(cmd: list[str], timeout: int = 30, check: bool = True) -> subprocess.CompletedProcess:
|
|
"""Run a command and return the result."""
|
|
return subprocess.run(cmd, capture_output=True, text=True, timeout=timeout, check=check)
|
|
|
|
|
|
def get_host_ip() -> str:
|
|
"""Get the public IP of the host machine."""
|
|
result = run(["curl", "-s", "--max-time", "10", IP_CHECK_URL])
|
|
return result.stdout.strip()
|
|
|
|
|
|
def podman_exec(container: str, cmd: list[str], timeout: int = 15) -> subprocess.CompletedProcess:
|
|
"""Execute a command inside a running container."""
|
|
return run(["podman", "exec", container] + cmd, timeout=timeout, check=False)
|
|
|
|
|
|
class TestVPNImage(unittest.TestCase):
|
|
"""Test suite for the WireGuard VPN container."""
|
|
|
|
host_ip: str = ""
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
"""Build image, get host IP, start container, wait for VPN."""
|
|
# Clean up any leftover container from a previous run
|
|
subprocess.run(
|
|
["podman", "rm", "-f", CONTAINER_NAME],
|
|
capture_output=True, check=False,
|
|
)
|
|
|
|
# ── 1. Get host public IP before VPN ──
|
|
print("\n[setup] Fetching host public IP...")
|
|
cls.host_ip = get_host_ip()
|
|
print(f"[setup] Host public IP: {cls.host_ip}")
|
|
assert cls.host_ip, "Could not determine host public IP"
|
|
|
|
# ── 2. Build the image ──
|
|
print(f"[setup] Building image '{IMAGE_NAME}'...")
|
|
result = run(
|
|
["podman", "build", "-t", IMAGE_NAME, BUILD_DIR],
|
|
timeout=180,
|
|
)
|
|
print(result.stdout[-500:] if len(result.stdout) > 500 else result.stdout)
|
|
assert result.returncode == 0, f"Build failed:\n{result.stderr}"
|
|
print("[setup] Image built successfully.")
|
|
|
|
# ── 3. Start the container ──
|
|
print(f"[setup] Starting container '{CONTAINER_NAME}'...")
|
|
result = run(
|
|
[
|
|
"podman", "run", "-d",
|
|
"--name", CONTAINER_NAME,
|
|
"--cap-add=NET_ADMIN",
|
|
"--cap-add=SYS_MODULE",
|
|
"--sysctl", "net.ipv4.ip_forward=1",
|
|
"-v", f"{CONFIG_FILE}:/etc/wireguard/wg0.conf:ro",
|
|
"-v", "/lib/modules:/lib/modules:ro",
|
|
IMAGE_NAME,
|
|
],
|
|
timeout=30,
|
|
check=False,
|
|
)
|
|
assert result.returncode == 0, f"Container failed to start:\n{result.stderr}"
|
|
cls.container_id = result.stdout.strip()
|
|
print(f"[setup] Container started: {cls.container_id[:12]}")
|
|
|
|
# Verify it's running
|
|
inspect = run(
|
|
["podman", "inspect", "-f", "{{.State.Running}}", CONTAINER_NAME],
|
|
check=False,
|
|
)
|
|
assert inspect.stdout.strip() == "true", "Container is not running"
|
|
|
|
# ── 4. Wait for VPN to come up ──
|
|
print(f"[setup] Waiting up to {STARTUP_TIMEOUT}s for VPN tunnel...")
|
|
vpn_up = cls._wait_for_vpn_cls(STARTUP_TIMEOUT)
|
|
assert vpn_up, f"VPN did not come up within {STARTUP_TIMEOUT}s"
|
|
print("[setup] VPN tunnel is up. Running tests.\n")
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
"""Stop and remove the container."""
|
|
print("\n[teardown] Cleaning up...")
|
|
subprocess.run(["podman", "rm", "-f", CONTAINER_NAME], capture_output=True, check=False)
|
|
print("[teardown] Done.")
|
|
|
|
@classmethod
|
|
def _wait_for_vpn_cls(cls, timeout: int = STARTUP_TIMEOUT) -> bool:
|
|
"""Wait until the VPN tunnel is up (can reach the internet)."""
|
|
deadline = time.time() + timeout
|
|
while time.time() < deadline:
|
|
result = podman_exec(CONTAINER_NAME, ["ping", "-c", "1", "-W", "3", "1.1.1.1"])
|
|
if result.returncode == 0:
|
|
return True
|
|
time.sleep(HEALTH_POLL_INTERVAL)
|
|
return False
|
|
|
|
def _get_vpn_ip(self) -> str:
|
|
"""Get the public IP as seen from inside the container."""
|
|
result = podman_exec(
|
|
CONTAINER_NAME,
|
|
["curl", "-s", "--max-time", "10", IP_CHECK_URL],
|
|
timeout=20,
|
|
)
|
|
return result.stdout.strip()
|
|
|
|
# ── Tests ────────────────────────────────────────────────
|
|
|
|
def test_01_ip_differs_from_host(self):
|
|
"""Public IP inside VPN is different from host IP."""
|
|
vpn_ip = self._get_vpn_ip()
|
|
print(f"\n[test] VPN public IP: {vpn_ip}")
|
|
print(f"[test] Host public IP: {self.host_ip}")
|
|
|
|
self.assertTrue(vpn_ip, "Could not fetch IP from inside the container")
|
|
self.assertNotEqual(
|
|
vpn_ip,
|
|
self.host_ip,
|
|
f"VPN IP ({vpn_ip}) is the same as host IP — VPN is not working!",
|
|
)
|
|
|
|
def test_02_wireguard_interface_exists(self):
|
|
"""The wg0 interface is present in the container."""
|
|
result = podman_exec(CONTAINER_NAME, ["wg", "show", "wg0"])
|
|
self.assertEqual(result.returncode, 0, f"wg show failed:\n{result.stderr}")
|
|
self.assertIn("peer", result.stdout.lower(), "No peer information in wg show output")
|
|
|
|
def test_03_kill_switch_blocks_traffic(self):
|
|
"""When WireGuard is down, traffic is blocked (kill switch)."""
|
|
# Bring down the WireGuard interface by deleting it
|
|
down_result = podman_exec(CONTAINER_NAME, ["ip", "link", "del", "wg0"], timeout=10)
|
|
self.assertEqual(down_result.returncode, 0, f"ip link del wg0 failed:\n{down_result.stderr}")
|
|
|
|
# Give iptables a moment
|
|
time.sleep(2)
|
|
|
|
# Try to reach the internet — should fail due to kill switch
|
|
result = podman_exec(
|
|
CONTAINER_NAME,
|
|
["curl", "-s", "--max-time", "5", IP_CHECK_URL],
|
|
timeout=10,
|
|
)
|
|
self.assertNotEqual(
|
|
result.returncode, 0,
|
|
"Traffic went through even with WireGuard down — kill switch is NOT working!",
|
|
)
|
|
print("\n[test] Kill switch confirmed: traffic blocked with VPN down")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main(verbosity=2)
|