Testing BFloat16 Precision Support in DeepSpeed Framework
A comprehensive test suite for DeepSpeed’s bfloat16 (BF16) precision support, focusing on compatibility with various optimizers, Zero stages, and distributed training scenarios. The tests validate BF16 functionality across different configurations including OneCycle scheduling, empty gradients, and mixed precision training.
Test Coverage Overview
Implementation Analysis
Technical Details
Best Practices Demonstrated
microsoft/deepspeed
tests/unit/runtime/half_precision/test_bf16.py
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0
# DeepSpeed Team
import torch
import deepspeed
import pytest
from deepspeed.ops.adam import FusedAdam
from unit.common import DistributedTest
from deepspeed.ops.op_builder import CPUAdamBuilder
from unit.simple_model import SimpleModel, SimpleOptimizer, random_dataloader
from unit.util import bf16_required_version_check
from deepspeed import comm as dist
from deepspeed.accelerator import get_accelerator
class TestAdamBF16ZeroOneCycleCompatibility(DistributedTest):
world_size = 1
def test(self, zero_stage=2, use_cpu_offload=False):
if not bf16_required_version_check():
pytest.skip(
" DeepSpeed BFloat16 tests need torch >= 1.10, NCCL >= 2.10.3, CUDA > =11.0 and HW support for BFloat16 to run correctly"
)
if use_cpu_offload and not deepspeed.ops.__compatible_ops__[CPUAdamBuilder.NAME]:
pytest.skip("cpu-adam is not compatible")
config_dict = {
"train_micro_batch_size_per_gpu": 1,
"steps_per_print": 1,
"optimizer": {
"type": "Adam",
"params": {
"lr": 0.00015
}
},
"scheduler": {
"type": "OneCycle",
"params": {
"cycle_first_step_size": 16000,
"cycle_first_stair_count": 8000,
"decay_step_size": 16000,
"cycle_min_lr": 1e-06,
"cycle_max_lr": 3e-05,
"decay_lr_rate": 1e-07,
"cycle_min_mom": 0.85,
"cycle_max_mom": 0.99,
"decay_mom_rate": 0.0
}
},
"fp16": {
"enabled": False
},
"bf16": {
"enabled": True
},
"zero_optimization": {
"stage": zero_stage,
"cpu_offload": use_cpu_offload
}
}
hidden_dim = 10
model = SimpleModel(hidden_dim)
model, _, _, _ = deepspeed.initialize(config=config_dict, model=model, model_parameters=model.parameters())
data_loader = random_dataloader(model=model,
total_samples=50,
hidden_dim=hidden_dim,
device=model.device,
dtype=torch.bfloat16)
for n, batch in enumerate(data_loader):
loss = model(batch[0], batch[1])
model.backward(loss)
model.step()
class TestZeroAllowUntestedOptimizer(DistributedTest):
world_size = 1
def test(self, zero_stage=2, use_cpu_offload=False):
if not bf16_required_version_check():
pytest.skip(
" DeepSpeed BFloat16 tests need torch >= 1.10, NCCL >= 2.10.3, CUDA > =11.0 and HW support for BFloat16 to run correctly"
)
if use_cpu_offload and not deepspeed.ops.__compatible_ops__[CPUAdamBuilder.NAME]:
pytest.skip("cpu-adam is not compatible")
config_dict = {
"train_micro_batch_size_per_gpu": 4,
"steps_per_print": 1,
"fp16": {
"enabled": False,
},
"bf16": {
"enabled": True
},
"zero_optimization": {
"stage": zero_stage,
"cpu_offload": use_cpu_offload
},
"zero_allow_untested_optimizer": False
}
hidden_dim = 10
model = SimpleModel(hidden_dim)
optimizer = SimpleOptimizer(model.parameters())
with pytest.raises(AssertionError):
model, optim, _, _ = deepspeed.initialize(config=config_dict,
model=model,
optimizer=optimizer,
model_parameters=model.parameters())
class TestZeroEmptyPartition(DistributedTest):
world_size = 3
def test(self, zero_stage=2, use_cpu_offload=False):
if not bf16_required_version_check():
pytest.skip(
" DeepSpeed BFloat16 tests need torch >= 1.10, NCCL >= 2.10.3, CUDA > =11.0 and HW support for BFloat16 to run correctly"
)
if use_cpu_offload and not deepspeed.ops.__compatible_ops__[CPUAdamBuilder.NAME]:
pytest.skip("cpu-adam is not compatible")
if zero_stage == 3:
pytest.skip("skip for now")
config_dict = {
"train_micro_batch_size_per_gpu": 1,
"gradient_accumulation_steps": 1,
"fp16": {
"enabled": False
},
"bf16": {
"enabled": True
},
"optimizer": {
"type": "Adam",
"params": {
"lr": 0.00015
}
},
"zero_optimization": {
"stage": zero_stage,
"cpu_offload": use_cpu_offload,
"reduce_bucket_size": 100,
"allgather_bucket_size": 100
}
}
hidden_dim = 1
model = SimpleModel(hidden_dim)
# Ensure model has 2 parameters, to cause empty partition with DP=3
assert len(list(model.parameters())) == 2
model, _, _, _ = deepspeed.initialize(config=config_dict, model=model, model_parameters=model.parameters())
# Now make sure things work..
data_loader = random_dataloader(model=model,
total_samples=1,
hidden_dim=hidden_dim,
device=model.device,
dtype=torch.bfloat16)
for n, batch in enumerate(data_loader):
loss = model(batch[0], batch[1])
model.backward(loss)
model.step()
@pytest.mark.parametrize("optimizer_constructor", [torch.optim.Adam, FusedAdam])
class TestZeroSupportedClientOptimizer(DistributedTest):
world_size = 1
def test(self, optimizer_constructor, zero_stage=2):
if not bf16_required_version_check():
pytest.skip(
" DeepSpeed BFloat16 tests need torch >= 1.10, NCCL >= 2.10.3, CUDA > =11.0 and HW support for BFloat16 to run correctly"
)
config_dict = {
"train_micro_batch_size_per_gpu": 2,
"steps_per_print": 1,
"fp16": {
"enabled": False
},
"bf16": {
"enabled": True
},
"zero_optimization": {
"stage": zero_stage
}
}
hidden_dim = 10
model = SimpleModel(hidden_dim)
client_optimizer = optimizer_constructor(params=model.parameters())
model, _, _, _ = deepspeed.initialize(config=config_dict, model=model, optimizer=client_optimizer)
class TestZero2ReduceScatterOff(DistributedTest):
world_size = 2
def test(self):
if not bf16_required_version_check():
pytest.skip(
" DeepSpeed BFloat16 tests need torch >= 1.10, NCCL >= 2.10.3, CUDA > =11.0 and HW support for BFloat16 to run correctly"
)
config_dict = {
"train_micro_batch_size_per_gpu": 2,
"steps_per_print": 1,
"optimizer": {
"type": "Adam",
"params": {
"lr": 0.00015
}
},
"gradient_clipping": 1.0,
"zero_optimization": {
"stage": 2,
"contiguous_gradients": True,
"allgather_bucket_size": 2000000000,
"reduce_bucket_size": 200000000,
"overlap_comm": False,
"reduce_scatter": False
},
"fp16": {
"enabled": False
},
"bf16": {
"enabled": True
}
}
hidden_dim = 10
model = SimpleModel(hidden_dim)
model, _, _, _ = deepspeed.initialize(config=config_dict, model=model, model_parameters=model.parameters())
data_loader = random_dataloader(model=model,
total_samples=50,
hidden_dim=hidden_dim,
device=model.device,
dtype=torch.bfloat16)
for n, batch in enumerate(data_loader):
loss = model(batch[0], batch[1])
model.backward(loss)
model.step()
class TestZeroEmptyGrad(DistributedTest):
world_size = 1
def test(self, stage=2):
if not bf16_required_version_check():
pytest.skip(
" DeepSpeed BFloat16 tests need torch >= 1.10, NCCL >= 2.10.3, CUDA > =11.0 and HW support for BFloat16 to run correctly"
)
config_dict = {
"train_micro_batch_size_per_gpu": 1,
"steps_per_print": 1,
"fp16": {
"enabled": False
},
"bf16": {
"enabled": True
},
"zero_optimization": {
"stage": stage
}
}
hidden_dim = 10
model = SimpleModel(hidden_dim)
optimizer = torch.optim.Adam(model.parameters())
model, _, _, _ = deepspeed.initialize(config=config_dict, model=model, optimizer=optimizer)
data_loader = random_dataloader(model=model,
total_samples=50,
hidden_dim=hidden_dim,
device=model.device,
dtype=torch.bfloat16)
for n, batch in enumerate(data_loader):
loss = model(batch[0], batch[1])
model.backward(loss)
model.step()
@pytest.mark.parametrize("comp_type", [torch.float16, torch.bfloat16, torch.float], ids=["fp16", "bf16", "fp32"])
@pytest.mark.parametrize("comm_type", [torch.float16, torch.bfloat16, None], ids=["fp16", "bf16", "default"])
class TestZeroDtypeCocktail(DistributedTest):
world_size = 2
def test(self, comp_type, comm_type):
if comp_type == torch.bfloat16 or comm_type == torch.bfloat16:
if not bf16_required_version_check():
pytest.skip(
" DeepSpeed BFloat16 tests need torch >= 1.10, NCCL >= 2.10.3, CUDA > =11.0 and HW support for BFloat16 to run correctly"
)
if comp_type == torch.float16 or comm_type == torch.float16:
if not get_accelerator().is_fp16_supported():
pytest.skip("fp16 is not supported")
type_str = {torch.float16: "fp16", torch.bfloat16: "bf16"}
config_dict = {
"train_micro_batch_size_per_gpu": 2,
"steps_per_print": 1,
"fp16": {
"enabled": comp_type == torch.float16
},
"bf16": {
"enabled": comp_type == torch.bfloat16
},
"zero_optimization": {
"stage": 2
},
}
if comm_type is not None:
config_dict["communication_data_type"] = type_str[comm_type]
else:
comm_type = comp_type
hidden_dim = 10
model = SimpleModel(hidden_dim)
optimizer = torch.optim.Adam(model.parameters())
model, _, _, _ = deepspeed.initialize(config=config_dict, model=model, optimizer=optimizer)
data_loader = random_dataloader(model=model,
total_samples=2,
hidden_dim=hidden_dim,
device=model.device,
dtype=comp_type)
def custom_reduce(tensor, dst, op=dist.ReduceOp.SUM, group=None, async_op=False):
assert tensor.dtype == comm_type
return orig_torch_reduce(tensor, dst, op, group, async_op)
orig_torch_reduce = dist.reduce
dist.reduce = custom_reduce
for n, batch in enumerate(data_loader):
loss = model(batch[0], batch[1])
model.backward(loss)
model.step()
dist.reduce = orig_torch_reduce