Testing Compressed Communication Performance in DeepSpeed
This test suite evaluates the performance of compressed communication in DeepSpeed, focusing on the CompressedBackend implementation for distributed training. It measures latency, throughput, and bus bandwidth for compressed allreduce operations using BERT-Large scale tensor sizes.
Test Coverage Overview
Implementation Analysis
Technical Details
Best Practices Demonstrated
microsoft/deepspeed
tests/onebit/test_compressed_perf.py
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0
# DeepSpeed Team
import torch
import deepspeed.comm as dist
import numpy as np
import argparse
import deepspeed
import os
from deepspeed.runtime.comm.compressed import CompressedBackend
from deepspeed.utils.timer import SynchronizedWallClockTimer
from deepspeed.accelerator import get_accelerator
from statistics import mean
timers = SynchronizedWallClockTimer()
parser = argparse.ArgumentParser()
parser.add_argument('--local_rank', type=int, default=-1)
args = parser.parse_args()
deepspeed.init_distributed(dist_backend=get_accelerator().communication_backend_name())
args.local_rank = int(os.environ['LOCAL_RANK'])
get_accelerator().set_device(args.local_rank)
device = torch.device(get_accelerator().device_name(), args.local_rank)
size = dist.get_world_size()
rank = dist.get_rank()
backend = CompressedBackend()
local_rank = args.local_rank
# Setting tensor_size (BERT-Large)
tensor_size = 300 * 2**20
server_size = int(tensor_size / size)
if tensor_size % (8 * size) != 0:
right_tensor_size = tensor_size + (8 * size - (tensor_size % (8 * size)))
else:
right_tensor_size = tensor_size
right_server_size = right_tensor_size // size
# Adding bias to the initialization of the gradient we are communicating
# In order to get rid of the case where some elements in the gradient are too small
a = (torch.rand(tensor_size, device=device) - 0.5) + 0.01 * rank
worker_error = torch.zeros(right_tensor_size, device=device)
server_error = torch.zeros(right_server_size, device=device)
warmup = 10
iters = 10
# Warmup
for i in range(warmup):
backend.compressed_allreduce(a, worker_error, server_error, local_rank)
time_list = []
a_sign = a.sign().add_(1).bool().float().add_(-0.5).mul_(2.0)
scale = a.norm() / np.sqrt(a.numel())
a_compressed = scale * a_sign
print("Shape of the compressed buffer:", a_compressed.shape) if rank == 0 else None
for i in range(iters):
timers('compressed_allreduce').start()
backend.compressed_allreduce(a, worker_error, server_error, local_rank)
#deepspeed.comm.all_reduce(a_compressed)
timers('compressed_allreduce').stop()
time_list.append(timers('compressed_allreduce').elapsed())
#timer_names = ['compressed_allreduce']
#timers.log(names=timer_names, normalizer=1, memory_breakdown=None)
places = 2
convert = 1e3
float_size = 4
if rank == 0:
for i in range(iters):
lat = time_list[i]
print("latency = ", lat * convert)
minlat = round(min(time_list) * convert)
maxlat = round(max(time_list) * convert)
meanlat = round(mean(time_list) * convert, places)
print("min, max, and mean = {} ms, {} ms, {} ms".format(minlat, maxlat, meanlat)) if rank == 0 else None
#print("tensor shape", a.shape)
duration = meanlat / 1e3
tput = ((tensor_size * 4) / duration)
print("algo throughput: %f Bytes/s, %f GB/s" % (tput, tput / 1e9)) if rank == 0 else None
size = tensor_size * 4
n = dist.get_world_size()
busbw = (size / duration) * (2 * (n - 1) / n)
print("busbw: %f GB/s" % (busbw / 1e9)) if rank == 0 else None