Testing MoE Tensor Parallelism Implementation in DeepSpeed
This test suite validates the Mixture of Experts (MoE) implementation with tensor parallelism in DeepSpeed, focusing on expert distribution, parallel processing, and model initialization configurations.
Test Coverage Overview
Implementation Analysis
Technical Details
Best Practices Demonstrated
microsoft/deepspeed
tests/unit/moe/test_moe_tp.py
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0
# DeepSpeed Team
import torch
import deepspeed
import pytest
from unit.common import DistributedTest
from deepspeed.utils.torch import required_torch_version
from deepspeed.moe.layer import MoE
class MPU():
def __init__(self, tp_world_size):
self.rank = deepspeed.comm.get_rank()
self.world_size = deepspeed.comm.get_world_size()
self.tp_world_size = tp_world_size
for i in range(0, self.world_size, tp_world_size):
ranks = range(i, i + tp_world_size)
group = deepspeed.comm.new_group(ranks)
if self.rank in ranks:
self.tp_group = group
for i in range(0, tp_world_size):
ranks = range(i, self.world_size, tp_world_size)
group = deepspeed.comm.new_group(ranks)
if self.rank in ranks:
self.dp_group = group
def get_model_parallel_rank(self):
return self.rank % self.tp_world_size
def get_model_parallel_world_size(self):
return self.tp_world_size
def get_data_parallel_rank(self):
return self.rank // self.tp_world_size
def get_data_parallel_world_size(self):
return self.world_size // self.tp_world_size
def get_data_parallel_group(self):
return self.dp_group
def get_model_parallel_group(self):
return self.tp_group
@pytest.mark.parametrize("ep_size, tp_size", [(1, 2), (1, 4), (2, 2)])
@pytest.mark.parametrize("enable_expert_tp", [True, False])
@pytest.mark.parametrize("use_residual", [True, False])
class TestMOETensorParallel(DistributedTest):
world_size = 4
def test(self, ep_size, tp_size, enable_expert_tp, use_residual):
# TODO: replace this with a true parallel mlp in the future
# and run convergence tests
if not required_torch_version(min_version=1.8):
pytest.skip("DeepSpeed MoE tests need torch 1.8 or higher to run correctly")
config_dict = {"train_batch_size": 8, "steps_per_print": 1, "fp16": {"enabled": True}}
hidden_dim = 16
tensor_parallel_expert = torch.nn.Sequential(torch.nn.Linear(hidden_dim, 4 * hidden_dim // tp_size),
torch.nn.ReLU(),
torch.nn.Linear(4 * hidden_dim // tp_size, hidden_dim))
# set num experts to world size
world_size = deepspeed.comm.get_world_size()
model = MoE(
hidden_size=hidden_dim,
expert=tensor_parallel_expert,
num_experts=world_size,
ep_size=ep_size,
use_residual=use_residual,
enable_expert_tensor_parallelism=enable_expert_tp,
)
optimizer = torch.optim.AdamW(params=model.parameters())
model, _, _, _ = deepspeed.initialize(config=config_dict,
model=model,
optimizer=optimizer,
dist_init_required=False,
mpu=MPU(tp_size))
assert model.num_local_experts == world_size // ep_size
if enable_expert_tp:
assert deepspeed.utils.groups._get_expert_model_parallel_world_size() == tp_size
else:
assert deepspeed.utils.groups._get_expert_model_parallel_world_size() == 1