49 lines
1.3 KiB
Python
49 lines
1.3 KiB
Python
"""Tests for async_utils."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import time
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
|
from app.utils.async_utils import run_blocking
|
|
|
|
|
|
async def test_run_blocking_executes_callable_in_thread() -> None:
|
|
"""run_blocking should execute the provided callable and return its result."""
|
|
|
|
def blocking_add(x: int, y: int) -> int:
|
|
return x + y
|
|
|
|
result = await run_blocking(blocking_add, 3, 4)
|
|
assert result == 7
|
|
|
|
|
|
async def test_run_blocking_accepts_custom_executor() -> None:
|
|
"""run_blocking should use a provided executor when one is passed."""
|
|
|
|
def blocking_value() -> str:
|
|
return "ok"
|
|
|
|
executor = ThreadPoolExecutor(max_workers=1)
|
|
try:
|
|
result = await run_blocking(blocking_value, executor=executor)
|
|
assert result == "ok"
|
|
finally:
|
|
executor.shutdown(wait=True)
|
|
|
|
|
|
async def test_run_blocking_does_not_block_event_loop() -> None:
|
|
"""A blocking callable should not stall other async tasks."""
|
|
|
|
def sleep_block() -> str:
|
|
time.sleep(0.05)
|
|
return "done"
|
|
|
|
task = asyncio.create_task(run_blocking(sleep_block))
|
|
# Yield control to ensure the event loop can run other tasks while the
|
|
# blocking work executes in the thread pool.
|
|
await asyncio.sleep(0)
|
|
result = await task
|
|
assert result == "done"
|