Back to Repositories

Testing Model Unwrapping for Generation in DeepSpeed

This test suite validates the model unwrapping functionality in DeepSpeed’s Zero Optimization stage 3, specifically focusing on text generation scenarios. It ensures proper parameter gathering and hook management during model inference.

Test Coverage Overview

The test suite covers critical aspects of DeepSpeed’s model unwrapping mechanism for generation tasks.

Key areas tested include:
  • Parameter gathering verification during model unwrapping
  • Hook management before and after unwrapping
  • Zero stage 3 optimization compatibility
  • CPU offloading functionality

Implementation Analysis

The implementation uses a context manager approach for temporary model unwrapping during generation. The testing strategy employs a SimpleModel class with controlled parameter sizes and validates the behavior across distributed environments.

Key patterns include:
  • Distributed testing across multiple GPUs
  • Hook existence verification
  • Parameter persistence validation

Technical Details

Testing infrastructure includes:
  • DeepSpeed initialization with Zero stage 3 configuration
  • CPU parameter offloading with pinned memory
  • Automatic FP16/BF16 precision selection based on accelerator support
  • Custom hook verification utilities

Best Practices Demonstrated

The test implementation showcases several testing best practices for distributed AI frameworks.

Notable practices include:
  • Explicit world size specification for distributed testing
  • Proper resource cleanup through context manager usage
  • Comprehensive state validation before and after operations
  • Clear separation of configuration and test logic

microsoft/deepspeed

tests/unit/runtime/zero/test_unwrap_model.py

            
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0

# DeepSpeed Team

import deepspeed
from deepspeed.runtime.zero import unwrap_model_for_generation
from deepspeed.accelerator import get_accelerator

from unit.common import DistributedTest
from unit.simple_model import SimpleModel

config = {
    "train_batch_size": 2,
    "steps_per_print": 1,
    "optimizer": {
        "type": "Adam",
        "params": {
            "lr": 0.00015
        }
    },
    "zero_optimization": {
        "stage": 3,
        "stage3_param_persistence_threshold": 1,
        "offload_param": {
            "device": "cpu",
            "pin_memory": True
        }
    }
}

if get_accelerator().is_fp16_supported():
    config["fp16"] = {"enabled": True, "loss_scale": 138.}
elif get_accelerator().is_bf16_supported():
    config["bf16"] = {"enabled": True}


class TestUnwrapModel(DistributedTest):
    # gather across more than 1 gpu
    world_size = 2

    def test(self):

        def hooks_exist(engine):
            if engine.optimizer is not None and hasattr(engine.optimizer, "parameter_offload"):
                optimizer_offload = engine.optimizer.parameter_offload
            elif engine.optimizer is not None:
                optimizer_offload = engine.optimizer

            hooks = 0
            for hook in optimizer_offload.forward_hooks:
                hooks += 1
            if hooks > 0:
                return True
            return False

        model = SimpleModel(hidden_dim=100)
        engine, _, _, _ = deepspeed.initialize(args=None, model=model, config=config)

        with unwrap_model_for_generation(engine):
            # assert no hooks
            assert not hooks_exist(engine)
            # assert parameters gathered
            assert model.linears[0].weight.numel() != 0, "GatheredParameters should give a non-0-sized tensor"

        # assert hooks
        assert hooks_exist(engine)