"""Tests for async_utils.""" from __future__ import annotations import asyncio import time from concurrent.futures import ThreadPoolExecutor from app.utils.async_utils import run_blocking async def test_run_blocking_executes_callable_in_thread() -> None: """run_blocking should execute the provided callable and return its result.""" def blocking_add(x: int, y: int) -> int: return x + y result = await run_blocking(blocking_add, 3, 4) assert result == 7 async def test_run_blocking_accepts_custom_executor() -> None: """run_blocking should use a provided executor when one is passed.""" def blocking_value() -> str: return "ok" executor = ThreadPoolExecutor(max_workers=1) try: result = await run_blocking(blocking_value, executor=executor) assert result == "ok" finally: executor.shutdown(wait=True) async def test_run_blocking_does_not_block_event_loop() -> None: """A blocking callable should not stall other async tasks.""" def sleep_block() -> str: time.sleep(0.05) return "done" task = asyncio.create_task(run_blocking(sleep_block)) # Yield control to ensure the event loop can run other tasks while the # blocking work executes in the thread pool. await asyncio.sleep(0) result = await task assert result == "done"