Back to Repositories

Testing Asynchronous I/O Optimizer Tensor Swapping in Microsoft DeepSpeed

This test suite validates DeepSpeed’s asynchronous I/O functionality for swapping optimizer tensors between memory and NVMe storage devices. It implements multiprocessing tests to verify both basic and handle-based AIO operations with configurable read/write capabilities.

Test Coverage Overview

The test suite provides comprehensive coverage of DeepSpeed’s AIO operations, focusing on tensor swapping mechanisms.

  • Tests both basic and handle-based AIO implementations
  • Validates read and write operations to NVMe storage
  • Verifies multiprocessing functionality
  • Tests argument validation and processing

Implementation Analysis

The testing approach utilizes Python’s multiprocessing framework to validate asynchronous I/O operations. It implements a spawned process model for isolation and uses conditional logic to test different AIO implementations based on input arguments.

  • Multiprocessing with ‘spawn’ method for process isolation
  • Conditional testing between basic and handle-based implementations
  • Argument-driven test execution flow

Technical Details

  • Python multiprocessing library
  • Custom AIO implementation modules (ds_aio_basic, ds_aio_handle)
  • Argument validation framework
  • NVMe storage interface testing
  • Process spawning configuration

Best Practices Demonstrated

The test implementation showcases several testing best practices for distributed systems and I/O operations.

  • Proper process isolation through spawn method
  • Modular test implementation with separate concerns
  • Robust argument validation
  • Flexible testing configuration
  • Clear separation between basic and advanced functionality testing

microsoft/deepspeed

deepspeed/nvme/test_ds_aio.py

            
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0

# DeepSpeed Team
"""
Functionality of swapping optimizer tensors to/from (NVMe) storage devices.
"""

import multiprocessing as mp
from .ds_aio_basic import aio_basic_multiprocessing
from .ds_aio_handle import aio_handle_multiprocessing
from .ds_aio_args import get_validated_args


def ds_io_main():
    print(f'Testing deepspeed_aio python frontend')

    args = get_validated_args()
    mp.set_start_method('spawn')
    multiprocess_function = aio_handle_multiprocessing if args.handle else aio_basic_multiprocessing
    multiprocess_function(args, args.read)


if __name__ == "__main__":
    ds_io_main()