"""Tests for blocklist refactored components. Tests the individual components (downloader, parser, ban executor, workflow) that were extracted from the monolithic blocklist_service. """ from __future__ import annotations import asyncio from typing import Any from unittest.mock import AsyncMock, MagicMock, patch import aiohttp import pytest from app.exceptions import JailNotFoundError, JailOperationError from app.models.blocklist import BlocklistSource from app.services.blocklist_ban_executor import BanExecutor from app.services.blocklist_downloader import BlocklistDownloader from app.services.blocklist_import_workflow import BlocklistImportWorkflow from app.services.blocklist_parser import BlocklistParser, ParsedBlocklist class TestBlocklistDownloader: """Test BlocklistDownloader component.""" @pytest.mark.asyncio async def test_download_successful(self) -> None: """Test successful download.""" http_session = MagicMock(spec=aiohttp.ClientSession) response = AsyncMock() response.status = 200 response.text = AsyncMock(return_value="192.168.1.1\n10.0.0.1") http_session.get = MagicMock(return_value=AsyncMock(__aenter__=AsyncMock(return_value=response))) downloader = BlocklistDownloader(http_session) status, text = await downloader.download( "https://example.com/blocklist.txt", aiohttp.ClientTimeout(total=30), ) assert status == 200 assert text == "192.168.1.1\n10.0.0.1" @pytest.mark.asyncio async def test_download_retries_on_429(self) -> None: """Test retry logic on HTTP 429.""" http_session = MagicMock(spec=aiohttp.ClientSession) response_429 = AsyncMock() response_429.status = 429 response_429.text = AsyncMock(return_value="rate limited") response_200 = AsyncMock() response_200.status = 200 response_200.text = AsyncMock(return_value="192.168.1.1") http_session.get = MagicMock( side_effect=[ AsyncMock(__aenter__=AsyncMock(return_value=response_429)), AsyncMock(__aenter__=AsyncMock(return_value=response_200)), ] ) downloader = BlocklistDownloader(http_session, backoff_base=0.01) status, text = await downloader.download( "https://example.com/blocklist.txt", aiohttp.ClientTimeout(total=30), ) assert status == 200 assert text == "192.168.1.1" assert http_session.get.call_count == 2 @pytest.mark.asyncio async def test_download_fails_after_max_retries(self) -> None: """Test download fails after exhausting retries.""" http_session = MagicMock(spec=aiohttp.ClientSession) response_error = AsyncMock() response_error.status = 503 response_error.text = AsyncMock(return_value="service unavailable") http_session.get = MagicMock( side_effect=[ AsyncMock(__aenter__=AsyncMock(return_value=response_error)), AsyncMock(__aenter__=AsyncMock(return_value=response_error)), ] ) downloader = BlocklistDownloader(http_session, backoff_base=0.01) status, text = await downloader.download( "https://example.com/blocklist.txt", aiohttp.ClientTimeout(total=30), ) # After max retries exhausted, returns the last response assert status == 503 assert http_session.get.call_count == 2 class TestBlocklistParser: """Test BlocklistParser component.""" def test_parse_valid_ips(self) -> None: """Test parsing content with valid IPs.""" content = "192.168.1.1\n10.0.0.1\n# Comment\n172.16.0.1" result = BlocklistParser.parse(content) assert result.valid_ips == ["192.168.1.1", "10.0.0.1", "172.16.0.1"] assert result.skipped_entries == 0 # Comments are not counted as skipped def test_parse_skips_cidrs(self) -> None: """Test that CIDR ranges are skipped.""" content = "192.168.1.0/24\n10.0.0.1\n172.16.0.0/16" result = BlocklistParser.parse(content) assert result.valid_ips == ["10.0.0.1"] assert result.skipped_entries == 2 def test_parse_skips_malformed(self) -> None: """Test that malformed entries are skipped.""" content = "192.168.1.1\ninvalid\n10.0.0.1\nNOT_AN_IP" result = BlocklistParser.parse(content) assert result.valid_ips == ["192.168.1.1", "10.0.0.1"] assert result.skipped_entries == 2 def test_parse_skips_empty_lines(self) -> None: """Test that empty lines are skipped.""" content = "192.168.1.1\n\n10.0.0.1\n\n" result = BlocklistParser.parse(content) assert result.valid_ips == ["192.168.1.1", "10.0.0.1"] assert result.skipped_entries == 0 def test_parse_ipv6_addresses(self) -> None: """Test parsing IPv6 addresses.""" content = "2001:db8::1\nfe80::1\n192.168.1.1" result = BlocklistParser.parse(content) assert "2001:db8::1" in result.valid_ips assert "fe80::1" in result.valid_ips assert "192.168.1.1" in result.valid_ips def test_parse_with_stats(self) -> None: """Test parse_with_stats returns samples and statistics.""" content = "\n".join( ["192.168.1.{}".format(i) for i in range(1, 30)] + ["# Comment"] + ["invalid_entry"] ) entries, stats = BlocklistParser.parse_with_stats(content, sample_lines=10) assert len(entries) <= 10 assert stats["total_lines"] == 31 assert stats["valid_count"] == 29 assert stats["skipped_count"] == 1 # Only invalid_entry is skipped (comment is ignored) def test_parsed_blocklist_properties(self) -> None: """Test ParsedBlocklist properties.""" result = ParsedBlocklist( valid_ips=["192.168.1.1", "10.0.0.1"], skipped_entries=3, ) assert result.valid_ips == ["192.168.1.1", "10.0.0.1"] assert result.skipped_entries == 3 assert result.total_entries == 5 class TestBanExecutor: """Test BanExecutor component.""" @pytest.mark.asyncio async def test_ban_ips_success(self) -> None: """Test successful banning of IPs.""" ban_ip = AsyncMock() executor = BanExecutor(ban_ip) ips = ["192.168.1.1", "10.0.0.1", "172.16.0.1"] successful, failed, error = await executor.ban_ips( "/var/run/fail2ban/fail2ban.sock", "blocklist-import", ips, ) assert successful == 3 assert failed == 0 assert error is None assert ban_ip.call_count == 3 @pytest.mark.asyncio async def test_ban_ips_stops_on_jail_not_found(self) -> None: """Test that banning stops when jail is not found.""" ban_ip = AsyncMock() ban_ip.side_effect = [ None, # First ban succeeds JailNotFoundError("Jail not found"), ] executor = BanExecutor(ban_ip) ips = ["192.168.1.1", "10.0.0.1", "172.16.0.1"] successful, failed, error = await executor.ban_ips( "/var/run/fail2ban/fail2ban.sock", "blocklist-import", ips, ) assert successful == 1 assert failed == 0 assert "Jail not found" in error assert ban_ip.call_count == 2 # Stops after jail not found @pytest.mark.asyncio async def test_ban_ips_continues_on_operation_error(self) -> None: """Test that banning continues on individual operation errors.""" ban_ip = AsyncMock() ban_ip.side_effect = [ None, # First ban succeeds JailOperationError("Ban failed"), None, # Third ban succeeds ] executor = BanExecutor(ban_ip) ips = ["192.168.1.1", "10.0.0.1", "172.16.0.1"] successful, failed, error = await executor.ban_ips( "/var/run/fail2ban/fail2ban.sock", "blocklist-import", ips, ) assert successful == 2 assert failed == 1 assert error == "Ban failed" assert ban_ip.call_count == 3 # Continues after operation error @pytest.mark.asyncio async def test_ban_ips_empty_list(self) -> None: """Test banning empty list of IPs.""" ban_ip = AsyncMock() executor = BanExecutor(ban_ip) successful, failed, error = await executor.ban_ips( "/var/run/fail2ban/fail2ban.sock", "blocklist-import", [], ) assert successful == 0 assert failed == 0 assert error is None assert ban_ip.call_count == 0 class TestBlocklistImportWorkflow: """Test BlocklistImportWorkflow orchestrator.""" @pytest.mark.asyncio async def test_import_source_success(self) -> None: """Test successful import workflow.""" from app.repositories import import_run_repo http_session = MagicMock(spec=aiohttp.ClientSession) response = AsyncMock() response.status = 200 response.text = AsyncMock(return_value="192.168.1.1\n10.0.0.1\n# Comment") http_session.get = MagicMock(return_value=AsyncMock(__aenter__=AsyncMock(return_value=response))) ban_ip = AsyncMock() log_result = AsyncMock() workflow = BlocklistImportWorkflow(http_session, ban_ip, log_result) source = BlocklistSource( id=1, name="Test Source", url="https://example.com/blocklist.txt", enabled=True, created_at="2026-04-27T00:00:00Z", updated_at="2026-04-27T00:00:00Z", ) db = MagicMock() db.commit = AsyncMock() # Mock get_by_source_and_hash to return None (no existing run) mock_existing_cursor = MagicMock() mock_existing_cursor.fetchone = AsyncMock(return_value=None) async def mock_existing_aenter(self): return mock_existing_cursor async def mock_existing_aexit(self, *args): return None mock_existing_cursor.__aenter__ = mock_existing_aenter mock_existing_cursor.__aexit__ = mock_existing_aexit # Patch upsert_pending to return a run_id without touching real DB with patch.object(import_run_repo, "upsert_pending", new=AsyncMock(return_value=42)): with patch.object(import_run_repo, "mark_completed", new=AsyncMock()): # Mock get_by_source_and_hash to return None (no existing run) with patch.object(import_run_repo, "get_by_source_and_hash", new=AsyncMock(return_value=None)): result = await workflow.import_source(source, "/var/run/fail2ban/fail2ban.sock", db) assert result.source_id == 1 assert result.ips_imported == 2 assert result.ips_skipped == 0 assert result.error is None assert ban_ip.call_count == 2 @pytest.mark.asyncio async def test_import_source_download_error(self) -> None: """Test import workflow with download error.""" http_session = MagicMock(spec=aiohttp.ClientSession) http_session.get = MagicMock( side_effect=aiohttp.ClientError("Connection failed") ) ban_ip = AsyncMock() log_result = AsyncMock() workflow = BlocklistImportWorkflow(http_session, ban_ip, log_result) source = BlocklistSource( id=1, name="Test Source", url="https://example.com/blocklist.txt", enabled=True, created_at="2026-04-27T00:00:00Z", updated_at="2026-04-27T00:00:00Z", ) db = MagicMock() result = await workflow.import_source(source, "/var/run/fail2ban/fail2ban.sock", db) assert result.source_id == 1 assert result.ips_imported == 0 assert result.ips_skipped == 0 assert "Connection failed" in result.error or result.error is not None assert log_result.await_count == 1 @pytest.mark.asyncio async def test_import_source_http_non_200(self) -> None: """Test import workflow with non-200 HTTP status.""" http_session = MagicMock(spec=aiohttp.ClientSession) response = AsyncMock() response.status = 404 response.text = AsyncMock(return_value="Not Found") http_session.get = MagicMock(return_value=AsyncMock(__aenter__=AsyncMock(return_value=response))) ban_ip = AsyncMock() log_result = AsyncMock() workflow = BlocklistImportWorkflow(http_session, ban_ip, log_result) source = BlocklistSource( id=1, name="Test Source", url="https://example.com/blocklist.txt", enabled=True, created_at="2026-04-27T00:00:00Z", updated_at="2026-04-27T00:00:00Z", ) db = MagicMock() result = await workflow.import_source(source, "/var/run/fail2ban/fail2ban.sock", db) assert result.source_id == 1 assert result.ips_imported == 0 assert result.ips_skipped == 0 assert result.error == "HTTP 404"