Back to Repositories

Testing Compressed Communication Performance in DeepSpeed

This test suite evaluates the performance of compressed communication in DeepSpeed, focusing on the CompressedBackend implementation for distributed training. It measures latency, throughput, and bus bandwidth for compressed allreduce operations using BERT-Large scale tensor sizes.

Test Coverage Overview

The test suite covers compressed allreduce communication performance with specific focus on:
  • Tensor compression and decompression accuracy
  • Communication latency measurements
  • Throughput benchmarking
  • Bus bandwidth utilization
  • Error handling for worker and server compression

Implementation Analysis

The testing approach implements a systematic performance evaluation using DeepSpeed’s CompressedBackend. It utilizes synchronized wall clock timers and statistical analysis to measure communication performance across distributed processes, with specific attention to compression overhead and network efficiency.

Key patterns include worker-server error tracking, warmup iterations, and statistical aggregation of performance metrics.

Technical Details

Testing infrastructure includes:
  • PyTorch distributed communication framework
  • DeepSpeed’s CompressedBackend implementation
  • SynchronizedWallClockTimer for precise measurements
  • BERT-Large scale tensor sizes (300MB)
  • Configurable warmup and iteration counts
  • Custom error tracking for compression artifacts

Best Practices Demonstrated

The test implementation showcases several testing best practices:
  • Proper initialization of distributed environment
  • Consistent performance measurement methodology
  • Statistical analysis of results
  • Hardware-agnostic device handling
  • Comprehensive metric collection and reporting
  • Proper warmup period implementation

microsoft/deepspeed

tests/onebit/test_compressed_perf.py

            
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0

# DeepSpeed Team

import torch
import deepspeed.comm as dist
import numpy as np
import argparse
import deepspeed
import os

from deepspeed.runtime.comm.compressed import CompressedBackend
from deepspeed.utils.timer import SynchronizedWallClockTimer
from deepspeed.accelerator import get_accelerator
from statistics import mean

timers = SynchronizedWallClockTimer()

parser = argparse.ArgumentParser()
parser.add_argument('--local_rank', type=int, default=-1)
args = parser.parse_args()

deepspeed.init_distributed(dist_backend=get_accelerator().communication_backend_name())
args.local_rank = int(os.environ['LOCAL_RANK'])

get_accelerator().set_device(args.local_rank)
device = torch.device(get_accelerator().device_name(), args.local_rank)

size = dist.get_world_size()
rank = dist.get_rank()

backend = CompressedBackend()
local_rank = args.local_rank

# Setting tensor_size (BERT-Large)
tensor_size = 300 * 2**20
server_size = int(tensor_size / size)
if tensor_size % (8 * size) != 0:
    right_tensor_size = tensor_size + (8 * size - (tensor_size % (8 * size)))
else:
    right_tensor_size = tensor_size
right_server_size = right_tensor_size // size

# Adding bias to the initialization of the gradient we are communicating
# In order to get rid of the case where some elements in the gradient are too small
a = (torch.rand(tensor_size, device=device) - 0.5) + 0.01 * rank

worker_error = torch.zeros(right_tensor_size, device=device)
server_error = torch.zeros(right_server_size, device=device)

warmup = 10
iters = 10

# Warmup
for i in range(warmup):
    backend.compressed_allreduce(a, worker_error, server_error, local_rank)

time_list = []

a_sign = a.sign().add_(1).bool().float().add_(-0.5).mul_(2.0)
scale = a.norm() / np.sqrt(a.numel())
a_compressed = scale * a_sign

print("Shape of the compressed buffer:", a_compressed.shape) if rank == 0 else None

for i in range(iters):
    timers('compressed_allreduce').start()
    backend.compressed_allreduce(a, worker_error, server_error, local_rank)
    #deepspeed.comm.all_reduce(a_compressed)
    timers('compressed_allreduce').stop()
    time_list.append(timers('compressed_allreduce').elapsed())

#timer_names = ['compressed_allreduce']
#timers.log(names=timer_names, normalizer=1, memory_breakdown=None)

places = 2
convert = 1e3
float_size = 4

if rank == 0:
    for i in range(iters):
        lat = time_list[i]
        print("latency = ", lat * convert)

minlat = round(min(time_list) * convert)
maxlat = round(max(time_list) * convert)
meanlat = round(mean(time_list) * convert, places)
print("min, max, and mean = {} ms, {} ms, {} ms".format(minlat, maxlat, meanlat)) if rank == 0 else None
#print("tensor shape", a.shape)
duration = meanlat / 1e3
tput = ((tensor_size * 4) / duration)
print("algo throughput: %f Bytes/s, %f GB/s" % (tput, tput / 1e9)) if rank == 0 else None
size = tensor_size * 4
n = dist.get_world_size()
busbw = (size / duration) * (2 * (n - 1) / n)
print("busbw: %f GB/s" % (busbw / 1e9)) if rank == 0 else None