Validating LoRA Integration with Hybrid Engine in DeepSpeed
This test suite validates the LoRA (Low-Rank Adaptation) implementation in DeepSpeed’s Hybrid Engine, focusing on integration with various language models and ZeRO optimization stages. The tests ensure proper functionality of LoRA weight manipulation, gradient computation, and model parameter updates.
Test Coverage Overview
Implementation Analysis
Technical Details
Best Practices Demonstrated
microsoft/deepspeed
tests/unit/hybrid_engine/test_he_lora.py
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0
# DeepSpeed Team
import os
import math
import torch
import torch.nn.functional as F
import pytest
import deepspeed
from deepspeed.runtime.zero import GatheredParameters
from deepspeed.ops.op_builder import OpBuilder
from deepspeed.utils import safe_get_full_grad
import numpy.testing as npt
from unit.common import DistributedTest
from deepspeed.ops.op_builder import InferenceBuilder
from deepspeed.accelerator import get_accelerator
if not deepspeed.ops.__compatible_ops__[InferenceBuilder.NAME]:
pytest.skip("This op had not been implemented on this system.", allow_module_level=True)
from transformers import (AutoConfig, AutoTokenizer, AutoModelForCausalLM)
rocm_version = OpBuilder.installed_rocm_version()
if rocm_version != (0, 0):
pytest.skip("skip inference tests on rocm for now", allow_module_level=True)
def to_device(batch, device):
output = {}
for k, v in batch.items():
try:
output[k] = v.to(device)
except:
output[k] = v
return output
def convert_linear_layer_to_lora(model, part_module_name, lora_dim=0, lora_scaling=1, lora_droppout=0):
from deepspeed.compression.helper import recursive_getattr, recursive_setattr
repalce_name = []
for name, module in model.named_modules():
if isinstance(module, torch.nn.Linear) and part_module_name in name:
repalce_name.append(name)
for name in repalce_name:
module = recursive_getattr(model, name)
tmp = LinearLayer_LoRA(module.weight, lora_dim, lora_scaling, lora_droppout,
module.bias).to(module.weight.device).to(module.weight.dtype)
recursive_setattr(model, name, tmp)
return model
class LinearLayer_LoRA(torch.nn.Module):
# an simple implementation of LoRA
# for now only support Linear Layer
def __init__(self, weight, lora_dim=0, lora_scaling=1, lora_droppout=0, bias=None):
super(LinearLayer_LoRA, self).__init__()
self.weight = weight
self.bias = bias
if lora_dim <= 0:
raise ValueError("You are training to use LoRA, whose reduced dim should be larger than 1")
try:
# for zero stage 3
rows, columns = weight.ds_shape
except:
rows, columns = weight.shape
self.lora_right_weight = torch.nn.Parameter(torch.zeros(
columns, lora_dim)) # apply transpose so in forward we do not need to transpose again
self.lora_left_weight = torch.nn.Parameter(torch.zeros(lora_dim, rows))
self.lora_scaling = lora_scaling / lora_dim
if lora_droppout > 0:
self.lora_dropout = torch.nn.Dropout(lora_droppout)
else:
self.lora_dropout = torch.nn.Identity()
self.reset_parameters()
# disable the original weight gradient
self.weight.requires_grad = False
# fuse LoRA to the original weight
self.fuse_lora = False
def eval(self):
self.lora_dropout.eval()
def train(self, mode=True):
self.lora_dropout.train(mode)
def reset_parameters(self):
torch.nn.init.kaiming_uniform_(self.lora_right_weight, a=math.sqrt(5))
torch.nn.init.zeros_(self.lora_left_weight)
def forward(self, input):
if self.fuse_lora:
return F.linear(input, self.weight, self.bias)
else:
return F.linear(input, self.weight, self.bias) + (
self.lora_dropout(input) @ self.lora_right_weight @ self.lora_left_weight) * self.lora_scaling
def only_optimize_lora_parameters(model):
# turn off the gradient of all the parameters except the LoRA parameters
for name, param in model.named_parameters():
if "lora_right_weight" in name or "lora_left_weight" in name:
param.requires_grad = True
else:
param.requires_grad = False
return model
@pytest.mark.seq_inference
@pytest.mark.parametrize("batch_size", [1], ids=["bsz=1"])
@pytest.mark.parametrize("zero_stage", [2, 3], ids=["zero_stage=2", "zero_stage=3"])
@pytest.mark.parametrize("model_name", ["EleutherAI/gpt-neo-125m", "facebook/opt-350m", "bigscience/bloom-560m"])
@pytest.mark.parametrize("offload_device", ["none", "cpu"])
class TestHybridEngineLoRA(DistributedTest):
world_size = 1
def get_model(self, model_name):
local_rank = int(os.getenv("LOCAL_RANK", "0"))
model_config = AutoConfig.from_pretrained(model_name)
model_config.dropout = 0.0
model = AutoModelForCausalLM.from_pretrained(model_name, config=model_config)
model = model.half()
device = get_accelerator().device_name()
model = model.to(f'{device}:{local_rank}')
return model
def get_tokenizer(self, model_name):
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.pad_token = tokenizer.eos_token
return tokenizer
def get_train_sentences(self, batch_size):
sentences = [
r"
Human: I am trying to write a fairy tale. What is the most popular plot?
"
r"Assistant: The most popular plot might be a princess goes to a faraway land, falls in love",
r"
Human: What flowers should I grow to attract bees?
Assistant: The reason you want bees "
r"in your garden is to attract pollinators and get more fruit or vegetable production."
]
if batch_size <= 2:
return sentences[:batch_size]
else:
raise NotImplementedError(f"batch_size {batch_size} not implemented")
def test_lora(self, batch_size, model_name, zero_stage, offload_device):
local_rank = int(os.getenv("LOCAL_RANK", "0"))
model = self.get_model(model_name)
tokenizer = self.get_tokenizer(model_name)
train_sentences = self.get_train_sentences(batch_size)
# Inject LoRA
model = convert_linear_layer_to_lora(model, "", 8)
model = only_optimize_lora_parameters(model)
ds_config = {
"optimizer": {
"type": "Adam",
"params": {
"lr": 1.0,
"betas": [0.9, 0.95]
}
},
"train_batch_size": batch_size,
"fp16": {
"enabled": True,
"initial_scale_power": 12
},
"hybrid_engine": {
"enabled": True,
"pin_parameters": True
},
"zero_optimization": {
"stage": zero_stage,
"offload_optimizer": {
"device": offload_device
}
}
}
model, *_ = deepspeed.initialize(model=model, config=ds_config)
# Verify gradient norm is larger than 0
before_grad_update_layer0_params = [
ele.detach().cpu().float().numpy() for ele in model.layer_params[0]
if ele is not None and len(ele.shape) > 1
]
model.train()
batch = tokenizer(train_sentences, max_length=16, padding="max_length", truncation=True, return_tensors="pt")
device = get_accelerator().device_name()
batch = to_device(batch, f'{device}:{local_rank}')
batch["labels"] = batch["input_ids"]
outputs = model(**batch, use_cache=False)
loss = outputs.loss
model.backward(loss)
grad_norm_dict = dict()
for name, param in model.named_parameters():
if param.requires_grad is True:
grad_norm_dict[name] = torch.linalg.norm(safe_get_full_grad(param))
model.step()
grad_norm = sum([ele.detach().cpu().numpy() for ele in grad_norm_dict.values()])
assert grad_norm > 1E-5
# Verify parameter remains the same
after_grad_update_layer0_params = [
ele.detach().cpu().float().numpy() for ele in model.layer_params[0]
if ele is not None and len(ele.shape) > 1
]
for lhs, rhs in zip(before_grad_update_layer0_params, after_grad_update_layer0_params):
npt.assert_allclose(lhs, rhs, 1E-5, 1E-5)
# Verify fuse will mutate layer_params
model.eval()
with GatheredParameters(model.parameters()):
model.fuse_lora_weight()
after_grad_update_layer0_params_lora_fused = [
ele.detach().cpu().float().numpy() for ele in model.layer_params[0]
if ele is not None and len(ele.shape) > 1
]
for lhs, rhs in zip(before_grad_update_layer0_params, after_grad_update_layer0_params_lora_fused):
with pytest.raises(AssertionError):
npt.assert_allclose(lhs, rhs, 1E-5, 1E-5)
with GatheredParameters(model.parameters()):
model.unfuse_lora_weight()