TASK-028: Add exception logging to fire-and-forget asyncio.create_task()

- Create logged_task() helper in backend/app/utils/async_utils.py to wrap
  fire-and-forget coroutines with exception logging
- Ensures unhandled task exceptions are always logged to structlog instead of
  silently discarded (Python 3.11+ RuntimeWarning)
- Update ban_service.py to use logged_task() for geo_cache.lookup_batch()
  background resolution
- Add comprehensive tests for logged_task() in test_async_utils.py
- Document fire-and-forget task conventions in Backend-Development.md

The logged_task() wrapper catches any exception raised in a background task,
logs it with full traceback context and task name, and never re-raises.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
2026-04-26 15:17:30 +02:00
parent 46fa7c78bc
commit 5d24780c63
5 changed files with 158 additions and 37 deletions

View File

@@ -41,6 +41,7 @@ from app.models.ban import (
from app.repositories import fail2ban_db_repo
from app.repositories import history_archive_repo as default_history_archive_repo
from app.services.fail2ban_metadata_service import default_fail2ban_metadata_service
from app.utils.async_utils import logged_task
from app.utils.constants import (
DEFAULT_PAGE_SIZE,
FAIL2BAN_SOCKET_TIMEOUT,
@@ -611,8 +612,11 @@ async def bans_by_country(
if geo_cache is not None:
# Fire-and-forget: lookup_batch handles rate-limiting / retries.
# The dirty-set flush task persists results to the DB.
asyncio.create_task( # noqa: RUF006
geo_cache.lookup_batch(uncached, http_session, db=app_db),
asyncio.create_task(
logged_task(
geo_cache.lookup_batch(uncached, http_session, db=app_db),
"geo_bans_by_country",
),
name="geo_bans_by_country",
)
elif geo_enricher is not None and unique_ips:

View File

@@ -8,12 +8,17 @@ from __future__ import annotations
import asyncio
import functools
from collections.abc import Callable, Coroutine
from concurrent.futures import ThreadPoolExecutor
from typing import Callable, ParamSpec, TypeVar
from typing import Any, ParamSpec, TypeVar
import structlog
P = ParamSpec("P")
T = TypeVar("T")
log: structlog.stdlib.BoundLogger = structlog.get_logger()
DEFAULT_BLOCKING_EXECUTOR: ThreadPoolExecutor = ThreadPoolExecutor(
max_workers=16,
thread_name_prefix="bangui-blocking",
@@ -43,3 +48,27 @@ async def run_blocking(
func = functools.partial(func, *args, **kwargs)
return await loop.run_in_executor(executor, func)
return await loop.run_in_executor(executor, func, *args)
async def logged_task(
coro: Coroutine[Any, Any, Any],
name: str,
) -> None:
"""Execute a coroutine with automatic exception logging.
Wraps fire-and-forget tasks to ensure exceptions are always logged and
do not become unhandled task exceptions. Use with asyncio.create_task():
asyncio.create_task(
logged_task(some_coroutine(), "task_name"),
name="task_name"
)
Args:
coro: Coroutine to execute.
name: Task name for logging context.
"""
try:
await coro
except Exception: # noqa: BLE001
log.exception("background_task_failed", task_name=name)

View File

@@ -5,8 +5,12 @@ from __future__ import annotations
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
from unittest import mock
from app.utils.async_utils import run_blocking
import pytest
import structlog
from app.utils.async_utils import logged_task, run_blocking
async def test_run_blocking_executes_callable_in_thread() -> None:
@@ -46,3 +50,67 @@ async def test_run_blocking_does_not_block_event_loop() -> None:
await asyncio.sleep(0)
result = await task
assert result == "done"
async def test_logged_task_awaits_coroutine() -> None:
"""logged_task should await and complete the coroutine."""
async def dummy_coro() -> str:
await asyncio.sleep(0)
return "result"
with mock.patch("app.utils.async_utils.log") as mock_log:
await logged_task(dummy_coro(), "test_task")
mock_log.exception.assert_not_called()
async def test_logged_task_catches_exception_and_logs() -> None:
"""logged_task should catch exceptions and log them with task_name context."""
class CustomError(Exception):
pass
async def failing_coro() -> None:
raise CustomError("task failed")
with mock.patch("app.utils.async_utils.log") as mock_log:
await logged_task(failing_coro(), "failing_task")
mock_log.exception.assert_called_once_with(
"background_task_failed",
task_name="failing_task",
)
async def test_logged_task_with_asyncio_create_task() -> None:
"""logged_task should work correctly when wrapped with asyncio.create_task."""
results: list[str] = []
async def background_work() -> None:
await asyncio.sleep(0.01)
results.append("done")
with mock.patch("app.utils.async_utils.log"):
task = asyncio.create_task(
logged_task(background_work(), "bg_task"),
name="bg_task",
)
await task
assert results == ["done"]
async def test_logged_task_preserves_exception_info() -> None:
"""logged_task should preserve traceback when logging the exception."""
async def failing_coro() -> None:
raise ValueError("original error message")
with mock.patch("app.utils.async_utils.log") as mock_log:
await logged_task(failing_coro(), "test_task")
mock_log.exception.assert_called_once()
# Verify the exception context is logged (structlog.exception captures
# the traceback automatically)
args, kwargs = mock_log.exception.call_args
assert args[0] == "background_task_failed"
assert kwargs["task_name"] == "test_task"