Testing Asynchronous Utility Functions in OpenHands
This test suite validates asynchronous utility functions in OpenHands, focusing on parallel execution, exception handling, and async/sync conversions. The tests ensure robust handling of concurrent operations and proper interaction between synchronous and asynchronous code.
Test Coverage Overview
Implementation Analysis
Technical Details
Best Practices Demonstrated
all-hands-ai/openhands
tests/unit/test_async_utils.py
import asyncio
import pytest
from openhands.utils.async_utils import (
AsyncException,
call_async_from_sync,
call_sync_from_async,
wait_all,
)
@pytest.mark.asyncio
async def test_await_all():
# Mock function demonstrating some calculation - always takes a minimum of 0.1 seconds
async def dummy(value: int):
await asyncio.sleep(0.1)
return value * 2
# wait for 10 calculations - serially this would take 1 second
coro = wait_all(dummy(i) for i in range(10))
# give the task only 0.3 seconds to complete (This verifies they occur in parallel)
task = asyncio.create_task(coro)
await asyncio.wait([task], timeout=0.3)
# validate the results (We need to sort because they can return in any order)
results = list(await task)
expected = [i * 2 for i in range(10)]
assert expected == results
@pytest.mark.asyncio
async def test_await_all_single_exception():
# Mock function demonstrating some calculation - always takes a minimum of 0.1 seconds
async def dummy(value: int):
await asyncio.sleep(0.1)
if value == 1:
raise ValueError('Invalid value 1') # Throw an exception on every odd value
return value * 2
# expect an exception to be raised.
with pytest.raises(ValueError, match='Invalid value 1'):
await wait_all(dummy(i) for i in range(10))
@pytest.mark.asyncio
async def test_await_all_multi_exception():
# Mock function demonstrating some calculation - always takes a minimum of 0.1 seconds
async def dummy(value: int):
await asyncio.sleep(0.1)
if value & 1:
raise ValueError(
f'Invalid value {value}'
) # Throw an exception on every odd value
return value * 2
# expect an exception to be raised.
with pytest.raises(AsyncException):
await wait_all(dummy(i) for i in range(10))
@pytest.mark.asyncio
async def test_await_all_timeout():
result = 0
# Mock function updates a nonlocal variable after a delay
async def dummy(value: int):
nonlocal result
await asyncio.sleep(0.2)
result += value
# expect an exception to be raised.
with pytest.raises(asyncio.TimeoutError):
await wait_all((dummy(i) for i in range(10)), 0.1)
# Wait and then check the shared result - this makes sure that pending tasks were cancelled.
asyncio.sleep(0.2)
assert result == 0
@pytest.mark.asyncio
async def test_call_sync_from_async():
def dummy(value: int = 2):
return value * 2
result = await call_sync_from_async(dummy)
assert result == 4
result = await call_sync_from_async(dummy, 3)
assert result == 6
result = await call_sync_from_async(dummy, value=5)
assert result == 10
@pytest.mark.asyncio
async def test_call_sync_from_async_error():
def dummy():
raise ValueError()
with pytest.raises(ValueError):
await call_sync_from_async(dummy)
def test_call_async_from_sync():
async def dummy(value: int):
return value * 2
result = call_async_from_sync(dummy, 0, 3)
assert result == 6
def test_call_async_from_sync_error():
async def dummy(value: int):
raise ValueError()
with pytest.raises(ValueError):
call_async_from_sync(dummy, 0, 3)
def test_call_async_from_sync_background_tasks():
events = []
async def bg_task():
# This background task should finish after the dummy task
events.append('bg_started')
asyncio.sleep(0.2)
events.append('bg_finished')
async def dummy(value: int):
events.append('dummy_started')
# This coroutine kicks off a background task
asyncio.create_task(bg_task())
events.append('dummy_started')
call_async_from_sync(dummy, 0, 3)
# We check that the function did not return until all coroutines completed
# (Even though some of these were started as background tasks)
expected = ['dummy_started', 'dummy_started', 'bg_started', 'bg_finished']
assert expected == events