Back to Repositories

Testing Asynchronous Utility Functions in OpenHands

This test suite validates asynchronous utility functions in OpenHands, focusing on parallel execution, exception handling, and async/sync conversions. The tests ensure robust handling of concurrent operations and proper interaction between synchronous and asynchronous code.

Test Coverage Overview

The test suite provides comprehensive coverage of async utilities including parallel task execution (wait_all), exception handling, and timeout scenarios.

  • Tests parallel execution of multiple coroutines with timing verification
  • Validates single and multiple exception handling in concurrent operations
  • Verifies timeout behavior and proper task cancellation
  • Tests bidirectional sync/async function calls

Implementation Analysis

The testing approach uses pytest’s asyncio markers to handle async test functions effectively. The implementation employs mock async functions with controlled delays to validate concurrent execution patterns.

  • Uses asyncio.sleep for simulating time-dependent operations
  • Implements background task verification
  • Validates exception propagation across async boundaries
  • Tests both sync-to-async and async-to-sync conversions

Technical Details

  • Framework: pytest with pytest-asyncio
  • Key modules: asyncio for async operations
  • Custom utilities: wait_all, call_async_from_sync, call_sync_from_async
  • Test environment: Controlled async execution environment

Best Practices Demonstrated

The test suite exemplifies excellent async testing practices through thorough validation of concurrent operations and error conditions.

  • Comprehensive timeout and cancellation testing
  • Proper cleanup of background tasks
  • Robust exception handling verification
  • Clear separation of sync and async concerns

all-hands-ai/openhands

tests/unit/test_async_utils.py

            
import asyncio

import pytest

from openhands.utils.async_utils import (
    AsyncException,
    call_async_from_sync,
    call_sync_from_async,
    wait_all,
)


@pytest.mark.asyncio
async def test_await_all():
    # Mock function demonstrating some calculation - always takes a minimum of 0.1 seconds
    async def dummy(value: int):
        await asyncio.sleep(0.1)
        return value * 2

    # wait for 10 calculations - serially this would take 1 second
    coro = wait_all(dummy(i) for i in range(10))

    # give the task only 0.3 seconds to complete (This verifies they occur in parallel)
    task = asyncio.create_task(coro)
    await asyncio.wait([task], timeout=0.3)

    # validate the results (We need to sort because they can return in any order)
    results = list(await task)
    expected = [i * 2 for i in range(10)]
    assert expected == results


@pytest.mark.asyncio
async def test_await_all_single_exception():
    # Mock function demonstrating some calculation - always takes a minimum of 0.1 seconds
    async def dummy(value: int):
        await asyncio.sleep(0.1)
        if value == 1:
            raise ValueError('Invalid value 1')  # Throw an exception on every odd value
        return value * 2

    # expect an exception to be raised.
    with pytest.raises(ValueError, match='Invalid value 1'):
        await wait_all(dummy(i) for i in range(10))


@pytest.mark.asyncio
async def test_await_all_multi_exception():
    # Mock function demonstrating some calculation - always takes a minimum of 0.1 seconds
    async def dummy(value: int):
        await asyncio.sleep(0.1)
        if value & 1:
            raise ValueError(
                f'Invalid value {value}'
            )  # Throw an exception on every odd value
        return value * 2

    # expect an exception to be raised.
    with pytest.raises(AsyncException):
        await wait_all(dummy(i) for i in range(10))


@pytest.mark.asyncio
async def test_await_all_timeout():
    result = 0

    # Mock function updates a nonlocal variable after a delay
    async def dummy(value: int):
        nonlocal result
        await asyncio.sleep(0.2)
        result += value

    # expect an exception to be raised.
    with pytest.raises(asyncio.TimeoutError):
        await wait_all((dummy(i) for i in range(10)), 0.1)

    # Wait and then check the shared result - this makes sure that pending tasks were cancelled.
    asyncio.sleep(0.2)
    assert result == 0


@pytest.mark.asyncio
async def test_call_sync_from_async():
    def dummy(value: int = 2):
        return value * 2

    result = await call_sync_from_async(dummy)
    assert result == 4
    result = await call_sync_from_async(dummy, 3)
    assert result == 6
    result = await call_sync_from_async(dummy, value=5)
    assert result == 10


@pytest.mark.asyncio
async def test_call_sync_from_async_error():
    def dummy():
        raise ValueError()

    with pytest.raises(ValueError):
        await call_sync_from_async(dummy)


def test_call_async_from_sync():
    async def dummy(value: int):
        return value * 2

    result = call_async_from_sync(dummy, 0, 3)
    assert result == 6


def test_call_async_from_sync_error():
    async def dummy(value: int):
        raise ValueError()

    with pytest.raises(ValueError):
        call_async_from_sync(dummy, 0, 3)


def test_call_async_from_sync_background_tasks():
    events = []

    async def bg_task():
        # This background task should finish after the dummy task
        events.append('bg_started')
        asyncio.sleep(0.2)
        events.append('bg_finished')

    async def dummy(value: int):
        events.append('dummy_started')
        # This coroutine kicks off a background task
        asyncio.create_task(bg_task())
        events.append('dummy_started')

    call_async_from_sync(dummy, 0, 3)

    # We check that the function did not return until all coroutines completed
    # (Even though some of these were started as background tasks)
    expected = ['dummy_started', 'dummy_started', 'bg_started', 'bg_finished']
    assert expected == events