Validating GPT-2 Model Checkpoint Operations in DeepSpeed
This test suite validates DeepSpeed’s checkpoint functionality for GPT-2 model training across different configurations of model parallelism, GPU counts, and ZeRO optimization levels. It ensures proper saving and loading of model states while maintaining training parity.
Test Coverage Overview
Implementation Analysis
Technical Details
Best Practices Demonstrated
microsoft/deepspeed
tests/model/Megatron_GPT2/run_checkpoint_test.py
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0
# DeepSpeed Team
"""
Note: please copy webtext data to "Megatron-LM" folder, before running this script.
"""
import unittest
import subprocess
import os
import re
import shlex
from .test_common import BaseTestCase
LAYERS = 2
HIDDEN_SIZE = 128
ATTN_HEADS = 8
def remove_file(test_id, filename):
cmd = shlex.split(f"if [ -f {filename} ] ; then rm -v {filename}; fi")
print(f"{test_id} cmd: {cmd}")
subprocess.run(cmd, check=False, executable='/bin/bash')
def grep_loss_from_file(file_name):
loss = 0.0
with open(file_name, 'r') as f:
lines = f.readlines()
line_filter = "validation loss at the end of training for test data | LM loss:"
match_number = re.compile(r'LM loss: ([-+]?[0-9]+\.?[0-9]*(?:[Ee][-+]?[0-9]+)?)')
for line in lines:
if line_filter in line:
loss = re.findall(match_number, line)
loss = float(loss[0])
if loss == 0.0:
print("no loss found in file ", file_name)
return loss
class GPT2CheckpointTestCase(BaseTestCase):
def __init__(self, methodName="DeepSpeed function test on GPT2 model"):
super(GPT2CheckpointTestCase, self).__init__(methodName)
def setUp(self):
self.save_dir = os.getcwd()
new_dir = os.path.dirname(__file__)
if new_dir:
os.chdir(new_dir)
def tearDown(self):
os.chdir(self.save_dir)
def test_mp2_gpu4_node1_with_zero1(self):
test_config = {
"mp": 2,
"gpus": 4,
"nodes": 1,
"bs": 8,
"steps": 1100,
"layers": LAYERS,
"hidden_size": HIDDEN_SIZE,
"seq_length": 256,
"heads": ATTN_HEADS,
"deepspeed": True,
"tag": "ds_zero1",
"zero": True,
"other_args": "",
"checkpoint_name": "ckpt_mp2_gpu8_w_zero1",
"checkpoint_interval": 1000,
"json": "ds_config_func_bs8_zero1.json",
}
succ = self.run_test(test_config, 0.01)
self.assertTrue(succ)
def test_mp2_gpu4_node1_with_zero2(self):
test_config = {
"mp": 2,
"gpus": 4,
"nodes": 1,
"bs": 8,
"steps": 1100,
"layers": LAYERS,
"hidden_size": HIDDEN_SIZE,
"seq_length": 256,
"heads": ATTN_HEADS,
"deepspeed": True,
"tag": "ds_zero2",
"zero": True,
"other_args": "",
"checkpoint_name": "ckpt_mp2_gpu8_w_zero2",
"checkpoint_interval": 1000,
"json": "ds_config_func_bs8_zero2.json",
}
succ = self.run_test(test_config, 0.01)
self.assertTrue(succ)
def test_mp2_gpu4_node1_with_zero2_offload(self):
test_config = {
"mp": 2,
"gpus": 4,
"nodes": 1,
"bs": 8,
"steps": 1100,
"layers": LAYERS,
"hidden_size": HIDDEN_SIZE,
"seq_length": 256,
"heads": ATTN_HEADS,
"deepspeed": True,
"tag": "ds_zero2_offload",
"zero": True,
"other_args": "",
"checkpoint_name": "ckpt_mp2_gpu8_w_zero2_offload",
"checkpoint_interval": 1000,
"json": "ds_config_func_bs8_zero2_offload.json",
"cpu_optimizer": True,
}
succ = self.run_test(test_config, 0.01)
self.assertTrue(succ)
def test_mp1_gpu2_load_gpu1_node1_with_zero1(self):
test_config = {
"mp": 1,
"gpus": 2,
"load_gpus": 1,
"nodes": 1,
"bs": 8,
"steps": 1100,
"layers": LAYERS,
"hidden_size": HIDDEN_SIZE,
"seq_length": 256,
"heads": ATTN_HEADS,
"deepspeed": True,
"tag": "ds_zero1",
"zero": True,
"other_args": "",
"checkpoint_name": "ckpt_mp1_gpu2_gpu1_w_zero1",
"checkpoint_interval": 1000,
"json": "ds_config_func_bs8_zero1.json",
}
succ = self.run_test(test_config, 0.01)
self.assertTrue(succ)
def test_mp1_gpu2_load_gpu4_node1_with_zero1(self):
test_config = {
"mp": 1,
"gpus": 2,
"load_gpus": 4,
"nodes": 1,
"bs": 8,
"steps": 1100,
"layers": LAYERS,
"hidden_size": HIDDEN_SIZE,
"seq_length": 256,
"heads": ATTN_HEADS,
"deepspeed": True,
"tag": "ds_zero1",
"zero": True,
"other_args": "",
"checkpoint_name": "ckpt_mp1_gpu2_gpu4_w_zero1",
"checkpoint_interval": 1000,
"json": "ds_config_func_bs8_zero1.json",
}
succ = self.run_test(test_config, 0.01)
self.assertTrue(succ)
def test_mp1_gpu2_load_gpu1_node1_with_zero2(self):
test_config = {
"mp": 1,
"gpus": 2,
"load_gpus": 1,
"nodes": 1,
"bs": 8,
"steps": 1100,
"layers": LAYERS,
"hidden_size": HIDDEN_SIZE,
"seq_length": 256,
"heads": ATTN_HEADS,
"deepspeed": True,
"tag": "ds_zero2",
"zero": True,
"other_args": "",
"checkpoint_name": "ckpt_mp1_gpu2_gpu1_w_zero2",
"checkpoint_interval": 1000,
"json": "ds_config_func_bs8_zero2.json",
}
succ = self.run_test(test_config, 0.01)
self.assertTrue(succ)
def test_mp1_gpu2_load_gpu1_node1_with_zero2_offload(self):
test_config = {
"mp": 1,
"gpus": 2,
"load_gpus": 1,
"nodes": 1,
"bs": 8,
"steps": 1100,
"layers": LAYERS,
"hidden_size": HIDDEN_SIZE,
"seq_length": 256,
"heads": ATTN_HEADS,
"deepspeed": True,
"tag": "ds_zero2_offload",
"zero": True,
"other_args": "",
"checkpoint_name": "ckpt_mp1_gpu2_gpu1_w_zero2_offload",
"checkpoint_interval": 1000,
"json": "ds_config_func_bs8_zero2_offload.json",
"cpu_optimizer": True,
}
succ = self.run_test(test_config, 0.01)
self.assertTrue(succ)
def test_mp1_gpu2_load_gpu4_node1_with_zero2(self):
test_config = {
"mp": 1,
"gpus": 2,
"load_gpus": 4,
"nodes": 1,
"bs": 8,
"steps": 1100,
"layers": LAYERS,
"hidden_size": HIDDEN_SIZE,
"seq_length": 256,
"heads": ATTN_HEADS,
"deepspeed": True,
"tag": "ds_zero2",
"zero": True,
"other_args": "",
"checkpoint_name": "ckpt_mp1_gpu2_gpu4_w_zero2",
"checkpoint_interval": 1000,
"json": "ds_config_func_bs8_zero2.json",
}
succ = self.run_test(test_config, 0.01)
self.assertTrue(succ)
def test_mp1_gpu2_load_gpu4_node1_with_zero2_offload(self):
test_config = {
"mp": 1,
"gpus": 2,
"load_gpus": 4,
"nodes": 1,
"bs": 8,
"steps": 1100,
"layers": LAYERS,
"hidden_size": HIDDEN_SIZE,
"seq_length": 256,
"heads": ATTN_HEADS,
"deepspeed": True,
"tag": "ds_zero2_offload",
"zero": True,
"other_args": "",
"checkpoint_name": "ckpt_mp1_gpu2_gpu4_w_zero2_offload",
"checkpoint_interval": 1000,
"json": "ds_config_func_bs8_zero2_offload.json",
"cpu_optimizer": True,
}
succ = self.run_test(test_config, 0.01)
self.assertTrue(succ)
def test_mp2_gpu4_load_gpu2_node1_with_zero1(self):
test_config = {
"mp": 2,
"gpus": 4,
"load_gpus": 2,
"nodes": 1,
"bs": 8,
"steps": 1100,
"layers": LAYERS,
"hidden_size": HIDDEN_SIZE,
"seq_length": 256,
"heads": ATTN_HEADS,
"deepspeed": True,
"tag": "ds_zero1",
"zero": True,
"other_args": "",
"checkpoint_name": "ckpt_mp2_gpu4_gpu2_w_zero1",
"checkpoint_interval": 1000,
"json": "ds_config_func_bs8_zero1.json",
}
succ = self.run_test(test_config, 0.01)
self.assertTrue(succ)
def test_mp2_gpu2_load_gpu4_node1_with_zero1(self):
test_config = {
"mp": 2,
"gpus": 2,
"load_gpus": 4,
"nodes": 1,
"bs": 8,
"steps": 1100,
"layers": LAYERS,
"hidden_size": HIDDEN_SIZE,
"seq_length": 256,
"heads": ATTN_HEADS,
"deepspeed": True,
"tag": "ds_zero1",
"zero": True,
"other_args": "",
"checkpoint_name": "ckpt_mp2_gpu2_gpu4_w_zero1",
"checkpoint_interval": 1000,
"json": "ds_config_func_bs8_zero1.json",
}
succ = self.run_test(test_config, 0.01)
self.assertTrue(succ)
def test_mp2_gpu4_load_gpu2_node1_with_zero2(self):
test_config = {
"mp": 2,
"gpus": 4,
"load_gpus": 2,
"nodes": 1,
"bs": 8,
"steps": 1100,
"layers": LAYERS,
"hidden_size": HIDDEN_SIZE,
"seq_length": 256,
"heads": ATTN_HEADS,
"deepspeed": True,
"tag": "ds_zero2",
"zero": True,
"other_args": "",
"checkpoint_name": "ckpt_mp2_gpu4_gpu2_w_zero2",
"checkpoint_interval": 1000,
"json": "ds_config_func_bs8_zero2.json",
}
succ = self.run_test(test_config, 0.01)
self.assertTrue(succ)
def test_mp2_gpu4_load_gpu2_node1_with_zero2_offload(self):
test_config = {
"mp": 2,
"gpus": 4,
"load_gpus": 2,
"nodes": 1,
"bs": 8,
"steps": 1100,
"layers": LAYERS,
"hidden_size": HIDDEN_SIZE,
"seq_length": 256,
"heads": ATTN_HEADS,
"deepspeed": True,
"tag": "ds_zero2_offload",
"zero": True,
"other_args": "",
"checkpoint_name": "ckpt_mp2_gpu4_gpu2_w_zero2_offload",
"checkpoint_interval": 1000,
"json": "ds_config_func_bs8_zero2_offload.json",
"cpu_optimizer": True,
}
succ = self.run_test(test_config, 0.01)
self.assertTrue(succ)
def test_mp2_gpu2_load_gpu4_node1_with_zero2(self):
test_config = {
"mp": 2,
"gpus": 2,
"load_gpus": 4,
"nodes": 1,
"bs": 8,
"steps": 1100,
"layers": LAYERS,
"hidden_size": HIDDEN_SIZE,
"seq_length": 256,
"heads": ATTN_HEADS,
"deepspeed": True,
"tag": "ds_zero2",
"zero": True,
"other_args": "",
"checkpoint_name": "ckpt_mp2_gpu2_gpu4_w_zero2",
"checkpoint_interval": 1000,
"json": "ds_config_func_bs8_zero2.json",
}
succ = self.run_test(test_config, 0.01)
self.assertTrue(succ)
def test_mp2_gpu2_load_gpu4_node1_with_zero2_offload(self):
test_config = {
"mp": 2,
"gpus": 2,
"load_gpus": 4,
"nodes": 1,
"bs": 8,
"steps": 1100,
"layers": LAYERS,
"hidden_size": HIDDEN_SIZE,
"seq_length": 256,
"heads": ATTN_HEADS,
"deepspeed": True,
"tag": "ds_zero2_offload",
"zero": True,
"other_args": "",
"checkpoint_name": "ckpt_mp2_gpu2_gpu4_w_zero2_offload",
"checkpoint_interval": 1000,
"json": "ds_config_func_bs8_zero2_offload.json",
"cpu_optimizer": True,
}
succ = self.run_test(test_config, 0.01)
self.assertTrue(succ)
def test_mp2_gpu4_node1_without_zero(self):
test_config = {
"mp": 2,
"gpus": 4,
"nodes": 1,
"bs": 8,
"steps": 1100,
"layers": LAYERS,
"hidden_size": HIDDEN_SIZE,
"seq_length": 256,
"heads": ATTN_HEADS,
"deepspeed": True,
"zero": False,
"other_args": "",
"tag": "ds_without_zero",
"checkpoint_name": "ckpt_mp4_gpu16_wo_zero",
"checkpoint_interval": 1000,
"json": "ds_config_func_bs8_no_zero.json",
}
succ = self.run_test(test_config, 0.01)
self.assertTrue(succ)
def gen_name(self, test_config, prefix):
save_dir = "checkpoint_test_logs"
tag = test_config["tag"]
checkpoint_name = test_config["checkpoint_name"]
file_name = f"_{tag}_{checkpoint_name}.log"
return os.path.join(save_dir, prefix + file_name)
def run_test(self, test_config, r_tol):
print("
")
print("{0}: starting......".format(self.id()))
# Cache save and load gpu counts
save_gpus = test_config["gpus"]
if "load_gpus" in test_config:
load_gpus = test_config["load_gpus"]
del test_config["load_gpus"]
else:
load_gpus = test_config["gpus"]
# save to current directory.
checkpoint_folder = test_config["checkpoint_name"]
checkpoint_interval = test_config["checkpoint_interval"]
checkpoint_name = test_config["checkpoint_name"]
#---------------remove old checkpoint---------------#
try:
cmd = shlex.split(f"rm -rf {checkpoint_name}")
print(f"{self.id()} cmd: {cmd}")
subprocess.run(cmd, check=False, executable='/bin/bash')
except:
print("No old checkpoint")
if "cpu_optimizer" in test_config and test_config["cpu_optimizer"]:
cpu_optimizer_flag = " --cpu-optimizer"
else:
cpu_optimizer_flag = ""
#-----------------Saving Checkpoint-----------------#
# building checkpoint arguments
test_config[
"other_args"] = f"\"--save {checkpoint_folder} --save-interval {checkpoint_interval} {cpu_optimizer_flag}\""
prefix = "gpt2_saving_checkpoint"
# create checkpoint run...
base_file = self.gen_name(test_config, prefix)
# remove previous test log
try:
cmd = shlex.split(f"rm {base_file}")
subprocess.run(cmd, check=False, executable='/bin/bash')
except:
print(f"{self.id()} No old logs")
print("{0}: Run for saving checkpoint".format(self.id()))
self.run_gpt2_test(test_config, base_file)
#-----------------Loading Checkpoint-----------------#
# building checkpoint arguments
test_config["other_args"] = f"\"--load {checkpoint_folder} {cpu_optimizer_flag} \""
# set checkpoint load iteration
try:
cmd = shlex.split(f"echo {checkpoint_interval} > {checkpoint_name}/latest_checkpointed_iteration.txt")
print(f"{self.id()} running cmd: {cmd}")
subprocess.run(cmd, check=False, executable='/bin/bash')
except:
print(f"{self.id()} Failed to update the checkpoint iteration file")
return False
prefix = "gpt2_loading_checkpoint"
# set load gpus
test_config["gpus"] = load_gpus
print("{0}: Second run loading checkpoint and continuing.".format(self.id()))
test_file = self.gen_name(test_config, prefix)
# remove previous test log
try:
cmd = shlex.split(f"rm {test_file}")
subprocess.run(cmd, check=False, executable='/bin/bash')
except:
print(f"{self.id()} no previous logs for")
self.run_gpt2_test(test_config, test_file)
return self.check_parity(base_file, test_file, r_tol)
def has_loss_data(self, file_name):
has_loss = False
if os.path.exists(file_name):
loss = grep_loss_from_file(file_name)
if loss != 0.0:
has_loss = True
return has_loss
def check_parity(self, base_file, test_file, r_tol):
base_loss = grep_loss_from_file(base_file)
test_loss = grep_loss_from_file(test_file)
print("baseline loss: {0}, test loss: {1}".format(base_loss, test_loss))
if base_loss == 0.0 or test_loss == 0.0:
return False
if abs((base_loss - test_loss) / base_loss) > r_tol:
return False
return True
def checkpoint_suite():
suite = unittest.TestSuite()
suite.addTest(GPT2CheckpointTestCase('test_mp2_gpu4_node1_with_zero1'))
suite.addTest(GPT2CheckpointTestCase('test_mp2_gpu4_node1_with_zero2'))
suite.addTest(GPT2CheckpointTestCase('test_mp2_gpu4_node1_with_zero2_offload'))
# Shrink DP
suite.addTest(GPT2CheckpointTestCase('test_mp1_gpu2_load_gpu1_node1_with_zero1'))
suite.addTest(GPT2CheckpointTestCase('test_mp1_gpu2_load_gpu1_node1_with_zero2'))
suite.addTest(GPT2CheckpointTestCase('test_mp1_gpu2_load_gpu1_node1_with_zero2_offload'))
suite.addTest(GPT2CheckpointTestCase('test_mp2_gpu4_load_gpu2_node1_with_zero1'))
suite.addTest(GPT2CheckpointTestCase('test_mp2_gpu4_load_gpu2_node1_with_zero2'))
suite.addTest(GPT2CheckpointTestCase('test_mp2_gpu4_load_gpu2_node1_with_zero2_offload'))
# Expand DP
suite.addTest(GPT2CheckpointTestCase('test_mp1_gpu2_load_gpu4_node1_with_zero1'))
suite.addTest(GPT2CheckpointTestCase('test_mp1_gpu2_load_gpu4_node1_with_zero2'))
suite.addTest(GPT2CheckpointTestCase('test_mp1_gpu2_load_gpu4_node1_with_zero2_offload'))
suite.addTest(GPT2CheckpointTestCase('test_mp2_gpu2_load_gpu4_node1_with_zero1'))
suite.addTest(GPT2CheckpointTestCase('test_mp2_gpu2_load_gpu4_node1_with_zero2'))
suite.addTest(GPT2CheckpointTestCase('test_mp2_gpu2_load_gpu4_node1_with_zero2_offload'))
suite.addTest(GPT2CheckpointTestCase('test_mp2_gpu4_node1_without_zero'))
return suite
if __name__ == '__main__':
runner = unittest.TextTestRunner(failfast=True)
runner.run(checkpoint_suite())