Back to Repositories

Validating LoRA Integration with Hybrid Engine in DeepSpeed

This test suite validates the LoRA (Low-Rank Adaptation) implementation in DeepSpeed’s Hybrid Engine, focusing on integration with various language models and ZeRO optimization stages. The tests ensure proper functionality of LoRA weight manipulation, gradient computation, and model parameter updates.

Test Coverage Overview

The test suite covers essential LoRA functionality including:
  • Integration with multiple model architectures (GPT-Neo, OPT, BLOOM)
  • Compatibility with ZeRO stages 2 and 3
  • CPU offloading scenarios
  • Gradient computation and parameter updates
  • LoRA weight fusion and unfusion operations

Implementation Analysis

The testing approach implements a custom LinearLayer_LoRA class for low-rank adaptation of transformer models. It utilizes PyTest’s parametrization to validate different configurations, including batch sizes, model architectures, and optimization stages.

Key patterns include parameter freezing, LoRA injection, and verification of weight updates.

Technical Details

Testing tools and configuration:
  • PyTest framework with distributed testing support
  • DeepSpeed’s HybridEngine and ZeRO optimizer
  • Hugging Face Transformers integration
  • FP16 precision training
  • Custom LoRA implementation with configurable dimensions

Best Practices Demonstrated

The test suite exemplifies robust testing practices including:
  • Comprehensive parameter verification before and after updates
  • Gradient norm validation
  • Proper model state management (train/eval modes)
  • Systematic parameter freezing and unfreezing
  • Memory-efficient weight handling

microsoft/deepspeed

tests/unit/hybrid_engine/test_he_lora.py

            
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0

# DeepSpeed Team

import os
import math
import torch
import torch.nn.functional as F
import pytest
import deepspeed
from deepspeed.runtime.zero import GatheredParameters
from deepspeed.ops.op_builder import OpBuilder
from deepspeed.utils import safe_get_full_grad
import numpy.testing as npt
from unit.common import DistributedTest
from deepspeed.ops.op_builder import InferenceBuilder
from deepspeed.accelerator import get_accelerator

if not deepspeed.ops.__compatible_ops__[InferenceBuilder.NAME]:
    pytest.skip("This op had not been implemented on this system.", allow_module_level=True)

from transformers import (AutoConfig, AutoTokenizer, AutoModelForCausalLM)

rocm_version = OpBuilder.installed_rocm_version()
if rocm_version != (0, 0):
    pytest.skip("skip inference tests on rocm for now", allow_module_level=True)


def to_device(batch, device):
    output = {}
    for k, v in batch.items():
        try:
            output[k] = v.to(device)
        except:
            output[k] = v
    return output


def convert_linear_layer_to_lora(model, part_module_name, lora_dim=0, lora_scaling=1, lora_droppout=0):
    from deepspeed.compression.helper import recursive_getattr, recursive_setattr

    repalce_name = []
    for name, module in model.named_modules():
        if isinstance(module, torch.nn.Linear) and part_module_name in name:
            repalce_name.append(name)
    for name in repalce_name:
        module = recursive_getattr(model, name)
        tmp = LinearLayer_LoRA(module.weight, lora_dim, lora_scaling, lora_droppout,
                               module.bias).to(module.weight.device).to(module.weight.dtype)
        recursive_setattr(model, name, tmp)
    return model


class LinearLayer_LoRA(torch.nn.Module):
    # an simple implementation of LoRA
    # for now only support Linear Layer
    def __init__(self, weight, lora_dim=0, lora_scaling=1, lora_droppout=0, bias=None):
        super(LinearLayer_LoRA, self).__init__()
        self.weight = weight
        self.bias = bias

        if lora_dim <= 0:
            raise ValueError("You are training to use LoRA, whose reduced dim should be larger than 1")

        try:
            # for zero stage 3
            rows, columns = weight.ds_shape
        except:
            rows, columns = weight.shape
        self.lora_right_weight = torch.nn.Parameter(torch.zeros(
            columns, lora_dim))  # apply transpose so in forward we do not need to transpose again
        self.lora_left_weight = torch.nn.Parameter(torch.zeros(lora_dim, rows))
        self.lora_scaling = lora_scaling / lora_dim

        if lora_droppout > 0:
            self.lora_dropout = torch.nn.Dropout(lora_droppout)
        else:
            self.lora_dropout = torch.nn.Identity()

        self.reset_parameters()
        # disable the original weight gradient
        self.weight.requires_grad = False
        # fuse LoRA to the original weight
        self.fuse_lora = False

    def eval(self):
        self.lora_dropout.eval()

    def train(self, mode=True):
        self.lora_dropout.train(mode)

    def reset_parameters(self):
        torch.nn.init.kaiming_uniform_(self.lora_right_weight, a=math.sqrt(5))
        torch.nn.init.zeros_(self.lora_left_weight)

    def forward(self, input):
        if self.fuse_lora:
            return F.linear(input, self.weight, self.bias)
        else:
            return F.linear(input, self.weight, self.bias) + (
                self.lora_dropout(input) @ self.lora_right_weight @ self.lora_left_weight) * self.lora_scaling


def only_optimize_lora_parameters(model):
    # turn off the gradient of all the parameters except the LoRA parameters
    for name, param in model.named_parameters():
        if "lora_right_weight" in name or "lora_left_weight" in name:
            param.requires_grad = True
        else:
            param.requires_grad = False
    return model


@pytest.mark.seq_inference
@pytest.mark.parametrize("batch_size", [1], ids=["bsz=1"])
@pytest.mark.parametrize("zero_stage", [2, 3], ids=["zero_stage=2", "zero_stage=3"])
@pytest.mark.parametrize("model_name", ["EleutherAI/gpt-neo-125m", "facebook/opt-350m", "bigscience/bloom-560m"])
@pytest.mark.parametrize("offload_device", ["none", "cpu"])
class TestHybridEngineLoRA(DistributedTest):
    world_size = 1

    def get_model(self, model_name):
        local_rank = int(os.getenv("LOCAL_RANK", "0"))
        model_config = AutoConfig.from_pretrained(model_name)
        model_config.dropout = 0.0
        model = AutoModelForCausalLM.from_pretrained(model_name, config=model_config)
        model = model.half()
        device = get_accelerator().device_name()
        model = model.to(f'{device}:{local_rank}')
        return model

    def get_tokenizer(self, model_name):
        tokenizer = AutoTokenizer.from_pretrained(model_name)
        tokenizer.pad_token = tokenizer.eos_token
        return tokenizer

    def get_train_sentences(self, batch_size):
        sentences = [
            r"

Human: I am trying to write a fairy tale. What is the most popular plot?

"
            r"Assistant: The most popular plot might be a princess goes to a faraway land, falls in love",
            r"

Human: What flowers should I grow to attract bees?

Assistant: The reason you want bees "
            r"in your garden is to attract pollinators and get more fruit or vegetable production."
        ]
        if batch_size <= 2:
            return sentences[:batch_size]
        else:
            raise NotImplementedError(f"batch_size {batch_size} not implemented")

    def test_lora(self, batch_size, model_name, zero_stage, offload_device):
        local_rank = int(os.getenv("LOCAL_RANK", "0"))
        model = self.get_model(model_name)
        tokenizer = self.get_tokenizer(model_name)
        train_sentences = self.get_train_sentences(batch_size)

        # Inject LoRA
        model = convert_linear_layer_to_lora(model, "", 8)
        model = only_optimize_lora_parameters(model)

        ds_config = {
            "optimizer": {
                "type": "Adam",
                "params": {
                    "lr": 1.0,
                    "betas": [0.9, 0.95]
                }
            },
            "train_batch_size": batch_size,
            "fp16": {
                "enabled": True,
                "initial_scale_power": 12
            },
            "hybrid_engine": {
                "enabled": True,
                "pin_parameters": True
            },
            "zero_optimization": {
                "stage": zero_stage,
                "offload_optimizer": {
                    "device": offload_device
                }
            }
        }

        model, *_ = deepspeed.initialize(model=model, config=ds_config)

        # Verify gradient norm is larger than 0
        before_grad_update_layer0_params = [
            ele.detach().cpu().float().numpy() for ele in model.layer_params[0]
            if ele is not None and len(ele.shape) > 1
        ]

        model.train()
        batch = tokenizer(train_sentences, max_length=16, padding="max_length", truncation=True, return_tensors="pt")
        device = get_accelerator().device_name()
        batch = to_device(batch, f'{device}:{local_rank}')
        batch["labels"] = batch["input_ids"]
        outputs = model(**batch, use_cache=False)
        loss = outputs.loss
        model.backward(loss)

        grad_norm_dict = dict()
        for name, param in model.named_parameters():
            if param.requires_grad is True:
                grad_norm_dict[name] = torch.linalg.norm(safe_get_full_grad(param))

        model.step()
        grad_norm = sum([ele.detach().cpu().numpy() for ele in grad_norm_dict.values()])
        assert grad_norm > 1E-5

        # Verify parameter remains the same
        after_grad_update_layer0_params = [
            ele.detach().cpu().float().numpy() for ele in model.layer_params[0]
            if ele is not None and len(ele.shape) > 1
        ]
        for lhs, rhs in zip(before_grad_update_layer0_params, after_grad_update_layer0_params):
            npt.assert_allclose(lhs, rhs, 1E-5, 1E-5)

        # Verify fuse will mutate layer_params
        model.eval()
        with GatheredParameters(model.parameters()):
            model.fuse_lora_weight()

        after_grad_update_layer0_params_lora_fused = [
            ele.detach().cpu().float().numpy() for ele in model.layer_params[0]
            if ele is not None and len(ele.shape) > 1
        ]

        for lhs, rhs in zip(before_grad_update_layer0_params, after_grad_update_layer0_params_lora_fused):
            with pytest.raises(AssertionError):
                npt.assert_allclose(lhs, rhs, 1E-5, 1E-5)

        with GatheredParameters(model.parameters()):
            model.unfuse_lora_weight()