Back to Repositories

Validating GPT-2 Model Checkpoint Operations in DeepSpeed

This test suite validates DeepSpeed’s checkpoint functionality for GPT-2 model training across different configurations of model parallelism, GPU counts, and ZeRO optimization levels. It ensures proper saving and loading of model states while maintaining training parity.

Test Coverage Overview

The test suite provides comprehensive coverage of checkpoint functionality across multiple dimensions:
  • Model parallelism (MP) configurations from 1-2
  • GPU counts ranging from 1-4
  • ZeRO optimization stages 1 and 2 with CPU offloading
  • Checkpoint saving and loading with different GPU configurations
  • Validation of training loss parity between saved and loaded states

Implementation Analysis

The testing approach uses a systematic configuration matrix to validate checkpoint functionality. It employs unittest framework with parameterized test cases that verify both saving and loading checkpoints. Each test validates training loss convergence within specified tolerance across different execution configurations.

Technical Details

Key technical components include:
  • unittest framework for test organization
  • Subprocess management for running training
  • File system operations for checkpoint management
  • Regular expressions for parsing training logs
  • Configuration JSON files for DeepSpeed settings
  • Loss validation with relative tolerance checks

Best Practices Demonstrated

The test implementation showcases several testing best practices:
  • Modular test case design with clear setup/teardown
  • Comprehensive parameter validation
  • Robust error handling and cleanup
  • Detailed logging and progress tracking
  • Configurable tolerance thresholds
  • Systematic test suite organization

microsoft/deepspeed

tests/model/Megatron_GPT2/run_checkpoint_test.py

            
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0

# DeepSpeed Team
"""
Note: please copy webtext data to "Megatron-LM" folder, before running this script.
"""

import unittest
import subprocess
import os
import re
import shlex
from .test_common import BaseTestCase

LAYERS = 2
HIDDEN_SIZE = 128
ATTN_HEADS = 8


def remove_file(test_id, filename):
    cmd = shlex.split(f"if [ -f {filename} ] ; then rm -v {filename}; fi")
    print(f"{test_id} cmd: {cmd}")
    subprocess.run(cmd, check=False, executable='/bin/bash')


def grep_loss_from_file(file_name):
    loss = 0.0

    with open(file_name, 'r') as f:
        lines = f.readlines()
        line_filter = "validation loss at the end of training for test data | LM loss:"
        match_number = re.compile(r'LM loss: ([-+]?[0-9]+\.?[0-9]*(?:[Ee][-+]?[0-9]+)?)')

        for line in lines:
            if line_filter in line:
                loss = re.findall(match_number, line)
                loss = float(loss[0])

    if loss == 0.0:
        print("no loss found in file ", file_name)

    return loss


class GPT2CheckpointTestCase(BaseTestCase):

    def __init__(self, methodName="DeepSpeed function test on GPT2 model"):
        super(GPT2CheckpointTestCase, self).__init__(methodName)

    def setUp(self):
        self.save_dir = os.getcwd()
        new_dir = os.path.dirname(__file__)
        if new_dir:
            os.chdir(new_dir)

    def tearDown(self):
        os.chdir(self.save_dir)

    def test_mp2_gpu4_node1_with_zero1(self):
        test_config = {
            "mp": 2,
            "gpus": 4,
            "nodes": 1,
            "bs": 8,
            "steps": 1100,
            "layers": LAYERS,
            "hidden_size": HIDDEN_SIZE,
            "seq_length": 256,
            "heads": ATTN_HEADS,
            "deepspeed": True,
            "tag": "ds_zero1",
            "zero": True,
            "other_args": "",
            "checkpoint_name": "ckpt_mp2_gpu8_w_zero1",
            "checkpoint_interval": 1000,
            "json": "ds_config_func_bs8_zero1.json",
        }
        succ = self.run_test(test_config, 0.01)
        self.assertTrue(succ)

    def test_mp2_gpu4_node1_with_zero2(self):
        test_config = {
            "mp": 2,
            "gpus": 4,
            "nodes": 1,
            "bs": 8,
            "steps": 1100,
            "layers": LAYERS,
            "hidden_size": HIDDEN_SIZE,
            "seq_length": 256,
            "heads": ATTN_HEADS,
            "deepspeed": True,
            "tag": "ds_zero2",
            "zero": True,
            "other_args": "",
            "checkpoint_name": "ckpt_mp2_gpu8_w_zero2",
            "checkpoint_interval": 1000,
            "json": "ds_config_func_bs8_zero2.json",
        }
        succ = self.run_test(test_config, 0.01)
        self.assertTrue(succ)

    def test_mp2_gpu4_node1_with_zero2_offload(self):
        test_config = {
            "mp": 2,
            "gpus": 4,
            "nodes": 1,
            "bs": 8,
            "steps": 1100,
            "layers": LAYERS,
            "hidden_size": HIDDEN_SIZE,
            "seq_length": 256,
            "heads": ATTN_HEADS,
            "deepspeed": True,
            "tag": "ds_zero2_offload",
            "zero": True,
            "other_args": "",
            "checkpoint_name": "ckpt_mp2_gpu8_w_zero2_offload",
            "checkpoint_interval": 1000,
            "json": "ds_config_func_bs8_zero2_offload.json",
            "cpu_optimizer": True,
        }
        succ = self.run_test(test_config, 0.01)
        self.assertTrue(succ)

    def test_mp1_gpu2_load_gpu1_node1_with_zero1(self):
        test_config = {
            "mp": 1,
            "gpus": 2,
            "load_gpus": 1,
            "nodes": 1,
            "bs": 8,
            "steps": 1100,
            "layers": LAYERS,
            "hidden_size": HIDDEN_SIZE,
            "seq_length": 256,
            "heads": ATTN_HEADS,
            "deepspeed": True,
            "tag": "ds_zero1",
            "zero": True,
            "other_args": "",
            "checkpoint_name": "ckpt_mp1_gpu2_gpu1_w_zero1",
            "checkpoint_interval": 1000,
            "json": "ds_config_func_bs8_zero1.json",
        }
        succ = self.run_test(test_config, 0.01)
        self.assertTrue(succ)

    def test_mp1_gpu2_load_gpu4_node1_with_zero1(self):
        test_config = {
            "mp": 1,
            "gpus": 2,
            "load_gpus": 4,
            "nodes": 1,
            "bs": 8,
            "steps": 1100,
            "layers": LAYERS,
            "hidden_size": HIDDEN_SIZE,
            "seq_length": 256,
            "heads": ATTN_HEADS,
            "deepspeed": True,
            "tag": "ds_zero1",
            "zero": True,
            "other_args": "",
            "checkpoint_name": "ckpt_mp1_gpu2_gpu4_w_zero1",
            "checkpoint_interval": 1000,
            "json": "ds_config_func_bs8_zero1.json",
        }
        succ = self.run_test(test_config, 0.01)
        self.assertTrue(succ)

    def test_mp1_gpu2_load_gpu1_node1_with_zero2(self):
        test_config = {
            "mp": 1,
            "gpus": 2,
            "load_gpus": 1,
            "nodes": 1,
            "bs": 8,
            "steps": 1100,
            "layers": LAYERS,
            "hidden_size": HIDDEN_SIZE,
            "seq_length": 256,
            "heads": ATTN_HEADS,
            "deepspeed": True,
            "tag": "ds_zero2",
            "zero": True,
            "other_args": "",
            "checkpoint_name": "ckpt_mp1_gpu2_gpu1_w_zero2",
            "checkpoint_interval": 1000,
            "json": "ds_config_func_bs8_zero2.json",
        }
        succ = self.run_test(test_config, 0.01)
        self.assertTrue(succ)

    def test_mp1_gpu2_load_gpu1_node1_with_zero2_offload(self):
        test_config = {
            "mp": 1,
            "gpus": 2,
            "load_gpus": 1,
            "nodes": 1,
            "bs": 8,
            "steps": 1100,
            "layers": LAYERS,
            "hidden_size": HIDDEN_SIZE,
            "seq_length": 256,
            "heads": ATTN_HEADS,
            "deepspeed": True,
            "tag": "ds_zero2_offload",
            "zero": True,
            "other_args": "",
            "checkpoint_name": "ckpt_mp1_gpu2_gpu1_w_zero2_offload",
            "checkpoint_interval": 1000,
            "json": "ds_config_func_bs8_zero2_offload.json",
            "cpu_optimizer": True,
        }
        succ = self.run_test(test_config, 0.01)
        self.assertTrue(succ)

    def test_mp1_gpu2_load_gpu4_node1_with_zero2(self):
        test_config = {
            "mp": 1,
            "gpus": 2,
            "load_gpus": 4,
            "nodes": 1,
            "bs": 8,
            "steps": 1100,
            "layers": LAYERS,
            "hidden_size": HIDDEN_SIZE,
            "seq_length": 256,
            "heads": ATTN_HEADS,
            "deepspeed": True,
            "tag": "ds_zero2",
            "zero": True,
            "other_args": "",
            "checkpoint_name": "ckpt_mp1_gpu2_gpu4_w_zero2",
            "checkpoint_interval": 1000,
            "json": "ds_config_func_bs8_zero2.json",
        }
        succ = self.run_test(test_config, 0.01)
        self.assertTrue(succ)

    def test_mp1_gpu2_load_gpu4_node1_with_zero2_offload(self):
        test_config = {
            "mp": 1,
            "gpus": 2,
            "load_gpus": 4,
            "nodes": 1,
            "bs": 8,
            "steps": 1100,
            "layers": LAYERS,
            "hidden_size": HIDDEN_SIZE,
            "seq_length": 256,
            "heads": ATTN_HEADS,
            "deepspeed": True,
            "tag": "ds_zero2_offload",
            "zero": True,
            "other_args": "",
            "checkpoint_name": "ckpt_mp1_gpu2_gpu4_w_zero2_offload",
            "checkpoint_interval": 1000,
            "json": "ds_config_func_bs8_zero2_offload.json",
            "cpu_optimizer": True,
        }
        succ = self.run_test(test_config, 0.01)
        self.assertTrue(succ)

    def test_mp2_gpu4_load_gpu2_node1_with_zero1(self):
        test_config = {
            "mp": 2,
            "gpus": 4,
            "load_gpus": 2,
            "nodes": 1,
            "bs": 8,
            "steps": 1100,
            "layers": LAYERS,
            "hidden_size": HIDDEN_SIZE,
            "seq_length": 256,
            "heads": ATTN_HEADS,
            "deepspeed": True,
            "tag": "ds_zero1",
            "zero": True,
            "other_args": "",
            "checkpoint_name": "ckpt_mp2_gpu4_gpu2_w_zero1",
            "checkpoint_interval": 1000,
            "json": "ds_config_func_bs8_zero1.json",
        }
        succ = self.run_test(test_config, 0.01)
        self.assertTrue(succ)

    def test_mp2_gpu2_load_gpu4_node1_with_zero1(self):
        test_config = {
            "mp": 2,
            "gpus": 2,
            "load_gpus": 4,
            "nodes": 1,
            "bs": 8,
            "steps": 1100,
            "layers": LAYERS,
            "hidden_size": HIDDEN_SIZE,
            "seq_length": 256,
            "heads": ATTN_HEADS,
            "deepspeed": True,
            "tag": "ds_zero1",
            "zero": True,
            "other_args": "",
            "checkpoint_name": "ckpt_mp2_gpu2_gpu4_w_zero1",
            "checkpoint_interval": 1000,
            "json": "ds_config_func_bs8_zero1.json",
        }
        succ = self.run_test(test_config, 0.01)
        self.assertTrue(succ)

    def test_mp2_gpu4_load_gpu2_node1_with_zero2(self):
        test_config = {
            "mp": 2,
            "gpus": 4,
            "load_gpus": 2,
            "nodes": 1,
            "bs": 8,
            "steps": 1100,
            "layers": LAYERS,
            "hidden_size": HIDDEN_SIZE,
            "seq_length": 256,
            "heads": ATTN_HEADS,
            "deepspeed": True,
            "tag": "ds_zero2",
            "zero": True,
            "other_args": "",
            "checkpoint_name": "ckpt_mp2_gpu4_gpu2_w_zero2",
            "checkpoint_interval": 1000,
            "json": "ds_config_func_bs8_zero2.json",
        }
        succ = self.run_test(test_config, 0.01)
        self.assertTrue(succ)

    def test_mp2_gpu4_load_gpu2_node1_with_zero2_offload(self):
        test_config = {
            "mp": 2,
            "gpus": 4,
            "load_gpus": 2,
            "nodes": 1,
            "bs": 8,
            "steps": 1100,
            "layers": LAYERS,
            "hidden_size": HIDDEN_SIZE,
            "seq_length": 256,
            "heads": ATTN_HEADS,
            "deepspeed": True,
            "tag": "ds_zero2_offload",
            "zero": True,
            "other_args": "",
            "checkpoint_name": "ckpt_mp2_gpu4_gpu2_w_zero2_offload",
            "checkpoint_interval": 1000,
            "json": "ds_config_func_bs8_zero2_offload.json",
            "cpu_optimizer": True,
        }
        succ = self.run_test(test_config, 0.01)
        self.assertTrue(succ)

    def test_mp2_gpu2_load_gpu4_node1_with_zero2(self):
        test_config = {
            "mp": 2,
            "gpus": 2,
            "load_gpus": 4,
            "nodes": 1,
            "bs": 8,
            "steps": 1100,
            "layers": LAYERS,
            "hidden_size": HIDDEN_SIZE,
            "seq_length": 256,
            "heads": ATTN_HEADS,
            "deepspeed": True,
            "tag": "ds_zero2",
            "zero": True,
            "other_args": "",
            "checkpoint_name": "ckpt_mp2_gpu2_gpu4_w_zero2",
            "checkpoint_interval": 1000,
            "json": "ds_config_func_bs8_zero2.json",
        }
        succ = self.run_test(test_config, 0.01)
        self.assertTrue(succ)

    def test_mp2_gpu2_load_gpu4_node1_with_zero2_offload(self):
        test_config = {
            "mp": 2,
            "gpus": 2,
            "load_gpus": 4,
            "nodes": 1,
            "bs": 8,
            "steps": 1100,
            "layers": LAYERS,
            "hidden_size": HIDDEN_SIZE,
            "seq_length": 256,
            "heads": ATTN_HEADS,
            "deepspeed": True,
            "tag": "ds_zero2_offload",
            "zero": True,
            "other_args": "",
            "checkpoint_name": "ckpt_mp2_gpu2_gpu4_w_zero2_offload",
            "checkpoint_interval": 1000,
            "json": "ds_config_func_bs8_zero2_offload.json",
            "cpu_optimizer": True,
        }
        succ = self.run_test(test_config, 0.01)
        self.assertTrue(succ)

    def test_mp2_gpu4_node1_without_zero(self):
        test_config = {
            "mp": 2,
            "gpus": 4,
            "nodes": 1,
            "bs": 8,
            "steps": 1100,
            "layers": LAYERS,
            "hidden_size": HIDDEN_SIZE,
            "seq_length": 256,
            "heads": ATTN_HEADS,
            "deepspeed": True,
            "zero": False,
            "other_args": "",
            "tag": "ds_without_zero",
            "checkpoint_name": "ckpt_mp4_gpu16_wo_zero",
            "checkpoint_interval": 1000,
            "json": "ds_config_func_bs8_no_zero.json",
        }
        succ = self.run_test(test_config, 0.01)
        self.assertTrue(succ)

    def gen_name(self, test_config, prefix):
        save_dir = "checkpoint_test_logs"
        tag = test_config["tag"]
        checkpoint_name = test_config["checkpoint_name"]
        file_name = f"_{tag}_{checkpoint_name}.log"
        return os.path.join(save_dir, prefix + file_name)

    def run_test(self, test_config, r_tol):
        print("
")

        print("{0}: starting......".format(self.id()))

        # Cache save and load gpu counts
        save_gpus = test_config["gpus"]
        if "load_gpus" in test_config:
            load_gpus = test_config["load_gpus"]
            del test_config["load_gpus"]
        else:
            load_gpus = test_config["gpus"]

        # save to current directory.
        checkpoint_folder = test_config["checkpoint_name"]
        checkpoint_interval = test_config["checkpoint_interval"]
        checkpoint_name = test_config["checkpoint_name"]
        #---------------remove old checkpoint---------------#
        try:
            cmd = shlex.split(f"rm -rf {checkpoint_name}")
            print(f"{self.id()} cmd: {cmd}")
            subprocess.run(cmd, check=False, executable='/bin/bash')
        except:
            print("No old checkpoint")

        if "cpu_optimizer" in test_config and test_config["cpu_optimizer"]:
            cpu_optimizer_flag = " --cpu-optimizer"
        else:
            cpu_optimizer_flag = ""

        #-----------------Saving Checkpoint-----------------#
        # building checkpoint arguments
        test_config[
            "other_args"] = f"\"--save {checkpoint_folder} --save-interval {checkpoint_interval} {cpu_optimizer_flag}\""

        prefix = "gpt2_saving_checkpoint"

        # create checkpoint run...
        base_file = self.gen_name(test_config, prefix)

        # remove previous test log
        try:
            cmd = shlex.split(f"rm {base_file}")
            subprocess.run(cmd, check=False, executable='/bin/bash')
        except:
            print(f"{self.id()} No old logs")

        print("{0}: Run for saving checkpoint".format(self.id()))
        self.run_gpt2_test(test_config, base_file)

        #-----------------Loading Checkpoint-----------------#

        # building checkpoint arguments
        test_config["other_args"] = f"\"--load {checkpoint_folder} {cpu_optimizer_flag} \""

        # set checkpoint load iteration
        try:
            cmd = shlex.split(f"echo {checkpoint_interval} > {checkpoint_name}/latest_checkpointed_iteration.txt")
            print(f"{self.id()} running cmd: {cmd}")
            subprocess.run(cmd, check=False, executable='/bin/bash')
        except:
            print(f"{self.id()} Failed to update the checkpoint iteration file")
            return False

        prefix = "gpt2_loading_checkpoint"

        # set load gpus
        test_config["gpus"] = load_gpus

        print("{0}: Second run loading checkpoint and continuing.".format(self.id()))
        test_file = self.gen_name(test_config, prefix)

        # remove previous test log
        try:
            cmd = shlex.split(f"rm {test_file}")
            subprocess.run(cmd, check=False, executable='/bin/bash')
        except:
            print(f"{self.id()} no previous logs for")
        self.run_gpt2_test(test_config, test_file)
        return self.check_parity(base_file, test_file, r_tol)

    def has_loss_data(self, file_name):
        has_loss = False
        if os.path.exists(file_name):
            loss = grep_loss_from_file(file_name)
            if loss != 0.0:
                has_loss = True

        return has_loss

    def check_parity(self, base_file, test_file, r_tol):
        base_loss = grep_loss_from_file(base_file)
        test_loss = grep_loss_from_file(test_file)

        print("baseline loss: {0}, test loss: {1}".format(base_loss, test_loss))

        if base_loss == 0.0 or test_loss == 0.0:
            return False

        if abs((base_loss - test_loss) / base_loss) > r_tol:
            return False

        return True


def checkpoint_suite():
    suite = unittest.TestSuite()

    suite.addTest(GPT2CheckpointTestCase('test_mp2_gpu4_node1_with_zero1'))
    suite.addTest(GPT2CheckpointTestCase('test_mp2_gpu4_node1_with_zero2'))
    suite.addTest(GPT2CheckpointTestCase('test_mp2_gpu4_node1_with_zero2_offload'))

    # Shrink DP
    suite.addTest(GPT2CheckpointTestCase('test_mp1_gpu2_load_gpu1_node1_with_zero1'))
    suite.addTest(GPT2CheckpointTestCase('test_mp1_gpu2_load_gpu1_node1_with_zero2'))
    suite.addTest(GPT2CheckpointTestCase('test_mp1_gpu2_load_gpu1_node1_with_zero2_offload'))

    suite.addTest(GPT2CheckpointTestCase('test_mp2_gpu4_load_gpu2_node1_with_zero1'))
    suite.addTest(GPT2CheckpointTestCase('test_mp2_gpu4_load_gpu2_node1_with_zero2'))
    suite.addTest(GPT2CheckpointTestCase('test_mp2_gpu4_load_gpu2_node1_with_zero2_offload'))

    # Expand DP
    suite.addTest(GPT2CheckpointTestCase('test_mp1_gpu2_load_gpu4_node1_with_zero1'))
    suite.addTest(GPT2CheckpointTestCase('test_mp1_gpu2_load_gpu4_node1_with_zero2'))
    suite.addTest(GPT2CheckpointTestCase('test_mp1_gpu2_load_gpu4_node1_with_zero2_offload'))

    suite.addTest(GPT2CheckpointTestCase('test_mp2_gpu2_load_gpu4_node1_with_zero1'))
    suite.addTest(GPT2CheckpointTestCase('test_mp2_gpu2_load_gpu4_node1_with_zero2'))
    suite.addTest(GPT2CheckpointTestCase('test_mp2_gpu2_load_gpu4_node1_with_zero2_offload'))

    suite.addTest(GPT2CheckpointTestCase('test_mp2_gpu4_node1_without_zero'))

    return suite


if __name__ == '__main__':
    runner = unittest.TextTestRunner(failfast=True)
    runner.run(checkpoint_suite())