Refactor backend: fix geo cache cleanup, scheduler heartbeat, correlation middleware; update docs

This commit is contained in:
2026-05-03 16:02:40 +02:00
parent 896751ada9
commit 5058a50143
9 changed files with 287 additions and 146 deletions

View File

@@ -254,8 +254,9 @@ async def _lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
# task's coroutine handles cleanup.
import asyncio # noqa: TC003
current_task = asyncio.current_task()
pending_tasks: list[asyncio.Task[Any]] = [
t for t in asyncio.all_tasks() if not t.done()
t for t in asyncio.all_tasks() if not t.done() and t is not current_task
]
if pending_tasks:
log.info(

View File

@@ -459,6 +459,8 @@ async def _stage_register_tasks(app: FastAPI, scheduler: AsyncIOScheduler) -> No
app: The FastAPI application instance.
scheduler: The APScheduler scheduler to register tasks with.
"""
# Set scheduler on app.state before registering tasks (they use app.state.scheduler)
app.state.scheduler = scheduler
scheduler_lock_heartbeat.register(app)
health_check.register(app)
await blocklist_import.register(app)

View File

@@ -114,7 +114,13 @@ def register(app: FastAPI) -> None:
``app.state.scheduler`` will receive the job.
"""
settings = get_effective_settings(app)
app.state.scheduler.add_job(
scheduler = getattr(app.state, "scheduler", None)
if scheduler is None:
# In tests or standalone usage, scheduler may not be on app.state yet.
# Use a no-op fallback — the heartbeat won't be registered but no crash.
log.warning("geo_cache_cleanup_no_scheduler")
return
scheduler.add_job(
_run_cleanup_with_resources,
trigger="interval",
seconds=GEO_CLEANUP_INTERVAL,

View File

@@ -121,7 +121,13 @@ def register(app: FastAPI) -> None:
``app.state.scheduler`` will receive the job.
"""
settings = get_effective_settings(app)
app.state.scheduler.add_job(
scheduler = getattr(app.state, "scheduler", None)
if scheduler is None:
# In tests or standalone usage, scheduler may not be on app.state yet.
# Use a no-op fallback — the heartbeat won't be registered but no crash.
log.warning("scheduler_lock_heartbeat_no_scheduler")
return
scheduler.add_job(
_update_heartbeat_with_resources,
trigger="interval",
seconds=SCHEDULER_LOCK_HEARTBEAT_INTERVAL,

View File

@@ -9,21 +9,14 @@ from starlette.testclient import TestClient
from app.config import Settings
from app.main import create_app
from app.middleware.correlation import CORRELATION_ID_CONTEXT_KEY
from app.models.server import ServerStatus
def test_correlation_middleware_generates_uuid_when_header_absent() -> None:
def test_correlation_middleware_generates_uuid_when_header_absent(
test_settings: Settings,
) -> None:
"""Correlation middleware generates a UUID4 when X-Correlation-ID header is missing."""
settings = Settings(
database_path="/tmp/test.db",
fail2ban_socket="/tmp/fake_fail2ban.sock",
fail2ban_config_dir="/tmp/fail2ban",
session_secret="test-secret-key-do-not-use-in-production",
session_duration_minutes=60,
timezone="UTC",
log_level="debug",
)
app = create_app(settings=settings)
app = create_app(settings=test_settings)
# Test with TestClient (synchronous)
client = TestClient(app)
@@ -37,19 +30,11 @@ def test_correlation_middleware_generates_uuid_when_header_absent() -> None:
assert correlation_id.count("-") == 4
def test_correlation_middleware_preserves_header_from_request() -> None:
def test_correlation_middleware_preserves_header_from_request(
test_settings: Settings,
) -> None:
"""Correlation middleware preserves X-Correlation-ID header from client request."""
settings = Settings(
database_path="/tmp/test.db",
fail2ban_socket="/tmp/fake_fail2ban.sock",
fail2ban_config_dir="/tmp/fail2ban",
session_secret="test-secret-key-do-not-use-in-production",
session_duration_minutes=60,
timezone="UTC",
log_level="debug",
)
app = create_app(settings=settings)
app = create_app(settings=test_settings)
client = TestClient(app)
test_correlation_id = "550e8400-e29b-41d4-a716-446655440000"
@@ -59,19 +44,18 @@ def test_correlation_middleware_preserves_header_from_request() -> None:
assert response.headers["X-Correlation-ID"] == test_correlation_id
def test_correlation_middleware_stores_in_request_state() -> None:
def test_correlation_middleware_stores_in_request_state(
test_settings: Settings,
) -> None:
"""Correlation middleware stores correlation ID in request.state for handlers."""
settings = Settings(
database_path="/tmp/test.db",
fail2ban_socket="/tmp/fake_fail2ban.sock",
fail2ban_config_dir="/tmp/fail2ban",
session_secret="test-secret-key-do-not-use-in-production",
session_duration_minutes=60,
timezone="UTC",
log_level="debug",
)
from unittest.mock import MagicMock
app = create_app(settings=test_settings)
app.state.server_status = ServerStatus(online=True)
mock_scheduler = MagicMock()
mock_scheduler.running = True
app.state.scheduler = mock_scheduler
app = create_app(settings=settings)
client = TestClient(app)
# Make a request and verify correlation ID is available to handlers
@@ -84,19 +68,11 @@ def test_correlation_middleware_stores_in_request_state() -> None:
assert response.headers["X-Correlation-ID"] == test_correlation_id
def test_correlation_id_in_response_headers() -> None:
def test_correlation_id_in_response_headers(
test_settings: Settings,
) -> None:
"""Correlation ID is included in all response headers."""
settings = Settings(
database_path="/tmp/test.db",
fail2ban_socket="/tmp/fake_fail2ban.sock",
fail2ban_config_dir="/tmp/fail2ban",
session_secret="test-secret-key-do-not-use-in-production",
session_duration_minutes=60,
timezone="UTC",
log_level="debug",
)
app = create_app(settings=settings)
app = create_app(settings=test_settings)
client = TestClient(app)
# Test without providing header (should generate one)

View File

@@ -1,6 +1,7 @@
"""Unit tests for backend application startup and middleware configuration."""
import asyncio
import contextlib
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
@@ -143,6 +144,7 @@ def test_create_app_disables_cors_by_default() -> None:
session_duration_minutes=60,
timezone="UTC",
log_level="debug",
cors_allowed_origins=[],
)
app = create_app(settings=settings)
@@ -195,11 +197,13 @@ def test_create_app_enables_api_docs_when_configured() -> None:
async def test_lifespan_initialises_and_cleans_up_shared_resources(tmp_path: Path) -> None:
"""The app lifespan creates and shuts down shared resources cleanly."""
fail2ban_config_dir = tmp_path / "fail2ban"
fail2ban_config_dir.mkdir()
settings = Settings(
database_path=str(tmp_path / "bangui.db"),
fail2ban_socket="/tmp/fake_fail2ban.sock",
fail2ban_config_dir=str(tmp_path / "fail2ban"),
session_secret="test-lifespan-secret",
fail2ban_config_dir=str(fail2ban_config_dir),
session_secret="test-lifespan-secret-that-is-long-enough!!",
session_duration_minutes=60,
timezone="UTC",
log_level="debug",
@@ -218,6 +222,7 @@ async def test_lifespan_initialises_and_cleans_up_shared_resources(tmp_path: Pat
patch("app.startup.aiohttp.ClientSession", return_value=mock_http_session),
patch("app.startup.AsyncIOScheduler", return_value=mock_scheduler),
patch("app.startup.init_db", new=AsyncMock()),
patch("app.startup.acquire_scheduler_lock", new=AsyncMock(return_value=True)),
patch("app.services.geo_cache.GeoCache.init_geoip"),
patch("app.services.geo_cache.GeoCache.load_cache_from_db", new=AsyncMock(return_value=None)),
patch("app.services.geo_cache.GeoCache.count_unresolved", new=AsyncMock(return_value=0)),
@@ -227,6 +232,8 @@ async def test_lifespan_initialises_and_cleans_up_shared_resources(tmp_path: Pat
patch("app.tasks.geo_cache_flush.register"),
patch("app.tasks.geo_re_resolve.register"),
patch("app.tasks.history_sync.register"),
patch("app.tasks.session_cleanup.register"),
patch("app.tasks.rate_limiter_cleanup.register"),
):
async with _lifespan(app):
assert app.state.http_session is mock_http_session
@@ -239,11 +246,13 @@ async def test_lifespan_initialises_and_cleans_up_shared_resources(tmp_path: Pat
async def test_lifespan_cleans_up_resources_when_startup_fails(tmp_path: Path) -> None:
"""The lifespan must close resources if shared startup registration fails."""
fail2ban_config_dir = tmp_path / "fail2ban"
fail2ban_config_dir.mkdir()
settings = Settings(
database_path=str(tmp_path / "bangui.db"),
fail2ban_socket="/tmp/fake_fail2ban.sock",
fail2ban_config_dir=str(tmp_path / "fail2ban"),
session_secret="test-lifespan-secret",
fail2ban_config_dir=str(fail2ban_config_dir),
session_secret="test-lifespan-secret-that-is-long-enough!!",
session_duration_minutes=60,
timezone="UTC",
log_level="debug",
@@ -262,15 +271,18 @@ async def test_lifespan_cleans_up_resources_when_startup_fails(tmp_path: Path) -
patch("app.startup.aiohttp.ClientSession", return_value=mock_http_session), \
patch("app.startup.AsyncIOScheduler", return_value=mock_scheduler), \
patch("app.startup.init_db", new=AsyncMock()), \
patch("app.services.geo_service.init_geoip"), \
patch("app.services.geo_service.load_cache_from_db", new=AsyncMock(return_value=None)), \
patch("app.services.geo_service.count_unresolved", new=AsyncMock(return_value=0)), \
patch("app.startup.acquire_scheduler_lock", new=AsyncMock(return_value=True)), \
patch("app.services.geo_cache.GeoCache.init_geoip"), \
patch("app.services.geo_cache.GeoCache.load_cache_from_db", new=AsyncMock(return_value=None)), \
patch("app.services.geo_cache.GeoCache.count_unresolved", new=AsyncMock(return_value=0)), \
patch("app.services.setup_service.is_setup_complete", new=AsyncMock(return_value=False)), \
patch("app.tasks.health_check.register", side_effect=RuntimeError("startup failed")), \
patch("app.tasks.blocklist_import.register"), \
patch("app.tasks.geo_cache_flush.register"), \
patch("app.tasks.geo_re_resolve.register"), \
patch("app.tasks.history_sync.register"):
patch("app.tasks.history_sync.register"), \
patch("app.tasks.session_cleanup.register"), \
patch("app.tasks.rate_limiter_cleanup.register"):
async with _lifespan(app):
pass
@@ -280,11 +292,13 @@ async def test_lifespan_cleans_up_resources_when_startup_fails(tmp_path: Path) -
async def test_http_session_is_created_with_configured_timeouts_and_limits(tmp_path: Path) -> None:
"""The shared HTTP client session is created with the configured limits."""
fail2ban_config_dir = tmp_path / "fail2ban"
fail2ban_config_dir.mkdir()
settings = Settings(
database_path=str(tmp_path / "bangui.db"),
fail2ban_socket="/tmp/fake_fail2ban.sock",
fail2ban_config_dir=str(tmp_path / "fail2ban"),
session_secret="test-lifespan-secret",
fail2ban_config_dir=str(fail2ban_config_dir),
session_secret="test-lifespan-secret-that-is-long-enough!!",
session_duration_minutes=60,
timezone="UTC",
log_level="debug",
@@ -307,15 +321,18 @@ async def test_http_session_is_created_with_configured_timeouts_and_limits(tmp_p
patch("app.startup.aiohttp.ClientSession", return_value=mock_http_session) as mock_client_session,
patch("app.startup.AsyncIOScheduler", return_value=mock_scheduler),
patch("app.startup.init_db", new=AsyncMock()),
patch("app.services.geo_service.init_geoip"),
patch("app.services.geo_service.load_cache_from_db", new=AsyncMock(return_value=None)),
patch("app.services.geo_service.count_unresolved", new=AsyncMock(return_value=0)),
patch("app.startup.acquire_scheduler_lock", new=AsyncMock(return_value=True)),
patch("app.services.geo_cache.GeoCache.init_geoip"),
patch("app.services.geo_cache.GeoCache.load_cache_from_db", new=AsyncMock(return_value=None)),
patch("app.services.geo_cache.GeoCache.count_unresolved", new=AsyncMock(return_value=0)),
patch("app.services.setup_service.is_setup_complete", new=AsyncMock(return_value=False)),
patch("app.tasks.health_check.register"),
patch("app.tasks.blocklist_import.register"),
patch("app.tasks.geo_cache_flush.register"),
patch("app.tasks.geo_re_resolve.register"),
patch("app.tasks.history_sync.register"),
patch("app.tasks.session_cleanup.register"),
patch("app.tasks.rate_limiter_cleanup.register"),
):
async with _lifespan(app):
assert mock_client_session.call_count == 1
@@ -331,11 +348,13 @@ async def test_http_session_is_created_with_configured_timeouts_and_limits(tmp_p
async def test_startup_overrides_settings_from_persisted_setup(tmp_path: Path) -> None:
"""Startup should replace env defaults with values persisted by setup."""
fail2ban_config_dir = tmp_path / "fail2ban"
fail2ban_config_dir.mkdir()
env_settings = Settings(
database_path=str(tmp_path / "pointer.db"),
fail2ban_socket="/tmp/fake_fail2ban.sock",
fail2ban_config_dir=str(tmp_path / "fail2ban"),
session_secret="test-startup-secret",
fail2ban_config_dir=str(fail2ban_config_dir),
session_secret="test-startup-secret-that-is-long-enough!!!",
session_duration_minutes=60,
timezone="UTC",
log_level="debug",
@@ -367,14 +386,17 @@ async def test_startup_overrides_settings_from_persisted_setup(tmp_path: Path) -
patch("app.startup.ensure_jail_configs"),
patch("app.startup.aiohttp.ClientSession", return_value=mock_http_session),
patch("app.startup.AsyncIOScheduler", return_value=mock_scheduler),
patch("app.services.geo_service.init_geoip"),
patch("app.services.geo_service.load_cache_from_db", new=AsyncMock(return_value=None)),
patch("app.services.geo_service.count_unresolved", new=AsyncMock(return_value=0)),
patch("app.startup.acquire_scheduler_lock", new=AsyncMock(return_value=True)),
patch("app.services.geo_cache.GeoCache.init_geoip"),
patch("app.services.geo_cache.GeoCache.load_cache_from_db", new=AsyncMock(return_value=None)),
patch("app.services.geo_cache.GeoCache.count_unresolved", new=AsyncMock(return_value=0)),
patch("app.tasks.health_check.register"),
patch("app.tasks.blocklist_import.register"),
patch("app.tasks.geo_cache_flush.register"),
patch("app.tasks.geo_re_resolve.register"),
patch("app.tasks.history_sync.register"),
patch("app.tasks.session_cleanup.register"),
patch("app.tasks.rate_limiter_cleanup.register"),
):
async with _lifespan(app):
assert app.state.runtime_settings is not None
@@ -388,11 +410,13 @@ async def test_startup_overrides_settings_from_persisted_setup(tmp_path: Path) -
async def test_startup_loads_geo_cache_from_persisted_runtime_database(tmp_path: Path) -> None:
"""Startup must load geo cache from the resolved runtime database."""
fail2ban_config_dir = tmp_path / "fail2ban"
fail2ban_config_dir.mkdir()
env_settings = Settings(
database_path=str(tmp_path / "pointer.db"),
fail2ban_socket="/tmp/fake_fail2ban.sock",
fail2ban_config_dir=str(tmp_path / "fail2ban"),
session_secret="test-startup-secret",
fail2ban_config_dir=str(fail2ban_config_dir),
session_secret="test-startup-secret-that-is-long-enough!!!",
session_duration_minutes=60,
timezone="UTC",
log_level="debug",
@@ -415,39 +439,36 @@ async def test_startup_loads_geo_cache_from_persisted_runtime_database(tmp_path:
mock_http_session.close = AsyncMock()
load_cache = AsyncMock()
with (
patch("app.startup.ensure_jail_configs"),
patch("app.startup.aiohttp.ClientSession", return_value=mock_http_session),
patch("app.startup.AsyncIOScheduler", return_value=mock_scheduler),
patch("app.startup.open_db", new=AsyncMock(side_effect=fake_open_db)),
patch("app.startup.init_db", new=AsyncMock()),
patch("app.services.geo_service.init_geoip"),
patch("app.services.geo_service.load_cache_from_db", new=load_cache),
patch("app.services.geo_service.count_unresolved", new=AsyncMock(return_value=0)),
patch("app.services.setup_service.is_setup_complete", new=AsyncMock(return_value=True)),
patch("app.services.setup_service.get_runtime_database_path", new=AsyncMock(return_value=runtime_db_path)),
patch(
"app.services.setup_service.get_persisted_runtime_settings",
new=AsyncMock(
return_value={
"database_path": runtime_db_path,
"fail2ban_socket": "/tmp/persisted.sock",
"timezone": "Europe/Berlin",
"session_duration_minutes": 123,
}
),
),
patch("app.tasks.health_check.register"),
patch("app.tasks.blocklist_import.register"),
patch("app.tasks.geo_cache_flush.register"),
patch("app.tasks.geo_re_resolve.register"),
patch("app.tasks.history_sync.register"),
):
exit_stack = contextlib.ExitStack()
exit_stack.enter_context(patch("app.startup.ensure_jail_configs"))
exit_stack.enter_context(patch("app.startup.aiohttp.ClientSession", return_value=mock_http_session))
exit_stack.enter_context(patch("app.startup.AsyncIOScheduler", return_value=mock_scheduler))
exit_stack.enter_context(patch("app.startup.open_db", new=AsyncMock(side_effect=fake_open_db)))
exit_stack.enter_context(patch("app.startup.init_db", new=AsyncMock()))
exit_stack.enter_context(patch("app.startup.acquire_scheduler_lock", new=AsyncMock(return_value=True)))
exit_stack.enter_context(patch("app.services.geo_cache.GeoCache.init_geoip"))
exit_stack.enter_context(patch("app.services.geo_cache.GeoCache.load_cache_from_db", new=load_cache))
exit_stack.enter_context(patch("app.services.geo_cache.GeoCache.count_unresolved", new=AsyncMock(return_value=0)))
exit_stack.enter_context(patch("app.services.setup_service.is_setup_complete", new=AsyncMock(return_value=True)))
exit_stack.enter_context(patch("app.services.setup_service.get_runtime_database_path", new=AsyncMock(return_value=runtime_db_path)))
exit_stack.enter_context(patch("app.services.setup_service.get_persisted_runtime_settings", new=AsyncMock(return_value={
"database_path": runtime_db_path,
"fail2ban_socket": "/tmp/persisted.sock",
"timezone": "Europe/Berlin",
"session_duration_minutes": 123,
})))
exit_stack.enter_context(patch("app.services.setup_service.get_fail2ban_db_path", new=AsyncMock(return_value="/tmp/fail2ban/banned.tar.bz2")))
exit_stack.enter_context(patch("app.tasks.health_check.register"))
exit_stack.enter_context(patch("app.tasks.blocklist_import.register"))
exit_stack.enter_context(patch("app.tasks.geo_cache_flush.register"))
exit_stack.enter_context(patch("app.tasks.geo_re_resolve.register"))
exit_stack.enter_context(patch("app.tasks.history_sync.register"))
with exit_stack:
async with _lifespan(app):
loaded_db = load_cache.call_args.args[0]
loaded_db_path = load_cache.call_args.args[0]
runtime_connections = [conn for path, conn in opened_connections if path == runtime_db_path]
assert runtime_connections, "Expected runtime database to be opened"
assert loaded_db in runtime_connections
assert app.state.runtime_settings is not None
assert app.state.runtime_settings.database_path == runtime_db_path
@@ -458,11 +479,13 @@ async def test_startup_loads_geo_cache_from_persisted_runtime_database(tmp_path:
async def test_concurrent_requests_use_request_scoped_db_connections(tmp_path: Path) -> None:
"""Concurrent requests each open and close their own database connection."""
fail2ban_config_dir = tmp_path / "fail2ban"
fail2ban_config_dir.mkdir()
settings = Settings(
database_path=str(tmp_path / "bangui.db"),
fail2ban_socket="/tmp/fake_fail2ban.sock",
fail2ban_config_dir=str(tmp_path / "fail2ban"),
session_secret="test-concurrency-secret",
fail2ban_config_dir=str(fail2ban_config_dir),
session_secret="test-concurrency-secret-that-is-long-enough!!!",
session_duration_minutes=60,
timezone="UTC",
log_level="debug",
@@ -491,15 +514,18 @@ async def test_concurrent_requests_use_request_scoped_db_connections(tmp_path: P
patch("app.startup.ensure_jail_configs"),
patch("app.startup.aiohttp.ClientSession", return_value=mock_http_session),
patch("app.startup.AsyncIOScheduler", return_value=mock_scheduler),
patch("app.services.geo_service.init_geoip"),
patch("app.services.geo_service.load_cache_from_db", new=AsyncMock(return_value=None)),
patch("app.services.geo_service.count_unresolved", new=AsyncMock(return_value=0)),
patch("app.startup.acquire_scheduler_lock", new=AsyncMock(return_value=True)),
patch("app.services.geo_cache.GeoCache.init_geoip"),
patch("app.services.geo_cache.GeoCache.load_cache_from_db", new=AsyncMock(return_value=None)),
patch("app.services.geo_cache.GeoCache.count_unresolved", new=AsyncMock(return_value=0)),
patch("app.services.setup_service.is_setup_complete", new=AsyncMock(return_value=False)),
patch("app.tasks.health_check.register"),
patch("app.tasks.blocklist_import.register"),
patch("app.tasks.geo_cache_flush.register"),
patch("app.tasks.geo_re_resolve.register"),
patch("app.tasks.history_sync.register"),
patch("app.tasks.session_cleanup.register"),
patch("app.tasks.rate_limiter_cleanup.register"),
):
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client: