Back to Repositories

Testing DataLoader Functionality and Batch Processing in DeepSpeed

This test suite validates DeepSpeed’s data loading and batch processing functionality, focusing on the RepeatingLoader implementation and DataLoader drop_last behavior. The tests ensure proper data handling and batch management across distributed training scenarios.

Test Coverage Overview

The test suite covers two main components of DeepSpeed’s data handling:
  • RepeatingLoader functionality for continuous data iteration
  • DataLoader drop_last behavior with different batch sizes
  • Batch processing across distributed environments
  • Integration with DeepSpeed’s initialization and training workflow

Implementation Analysis

The testing approach utilizes pytest’s parametrization for comprehensive batch size and drop_last combinations. The implementation leverages DistributedTest class for simulating distributed environments and employs SimpleModel with random datasets for validation.

Key patterns include:
  • Parametrized test configurations
  • Distributed test environment setup
  • Integration with torch optimizers and DeepSpeed initialization

Technical Details

Testing infrastructure includes:
  • pytest framework for test organization
  • DeepSpeed’s DistributedTest class
  • torch.optim.AdamW optimizer
  • Custom SimpleModel implementation
  • RepeatingLoader utility
  • Configurable batch sizes and drop_last settings

Best Practices Demonstrated

The test suite exemplifies robust testing practices through:
  • Comprehensive parameter coverage using pytest.mark.parametrize
  • Proper initialization and cleanup of distributed environment
  • Explicit device management and memory handling
  • Clear separation of test configurations and execution logic
  • Integration testing with actual training workflow

microsoft/deepspeed

tests/unit/runtime/test_data.py

            
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0

# DeepSpeed Team

from deepspeed.utils import RepeatingLoader
import torch
import pytest
import deepspeed
from deepspeed.accelerator import get_accelerator
from unit.common import DistributedTest
from unit.simple_model import SimpleModel, random_dataset


def test_repeating_loader():
    loader = [1, 2, 3]
    loader = RepeatingLoader(loader)

    for idx in range(50):
        assert next(loader) == 1
        assert next(loader) == 2
        assert next(loader) == 3


@pytest.mark.parametrize('train_batch_size, drop_last', [(1, True), (4, True), (1, False), (4, False)])
class TestDataLoaderDropLast(DistributedTest):
    world_size = 1

    def test(self, train_batch_size, drop_last):
        config_dict = {"train_batch_size": train_batch_size, "dataloader_drop_last": drop_last, "steps_per_print": 1}
        hidden_dim = 10

        model = SimpleModel(hidden_dim)
        optimizer = torch.optim.AdamW(params=model.parameters())
        # TODO: no way to set DeepSpeedEngine.deepspeed_io params, need to use
        # pin_memory=False for cuda device
        train_dataset = random_dataset(total_samples=50,
                                       hidden_dim=hidden_dim,
                                       device=torch.device('cpu'),
                                       dtype=torch.float32)
        model, _, training_dataloader, _ = deepspeed.initialize(config=config_dict,
                                                                model=model,
                                                                training_data=train_dataset,
                                                                optimizer=optimizer)
        training_dataloader.num_local_io_workers = 0  # We can't do nested mp.pool
        for n, batch in enumerate(training_dataloader):
            x = batch[0].to(get_accelerator().current_device_name())
            y = batch[1].to(get_accelerator().current_device_name())
            loss = model(x, y)
            model.backward(loss)
            model.step()