Back to Repositories

Validating Learning Rate Scheduler Implementations in DeepSpeed

This test suite validates learning rate scheduling functionality in DeepSpeed, focusing on various scheduler implementations including warmup, one-cycle, and cosine learning rate policies. The tests ensure proper initialization, progression, and boundary behaviors of learning rate schedules during model training.

Test Coverage Overview

The test suite provides comprehensive coverage of DeepSpeed’s learning rate schedulers, including:
  • Warmup LR scheduling with linear and logarithmic rates
  • One-cycle learning rate policy with momentum adjustments
  • Cosine learning rate scheduling with warmup
  • LR range testing functionality
  • Scheduler-optimizer parity verification

Implementation Analysis

The testing approach uses pytest parametrization to validate multiple scheduler configurations and edge cases. Tests verify continuous increase/decrease patterns, staircase behavior, and proper initialization states. Implementation leverages DeepSpeed’s distributed testing infrastructure with helper functions for verification.

Key patterns include continuous value progression checks, boundary condition validation, and optimizer state synchronization tests.

Technical Details

Testing tools and configuration:
  • pytest framework with parametrize decorators
  • DeepSpeed DistributedTest base class
  • SimpleModel test fixture
  • Random data loader generation
  • Adam optimizer integration
  • Custom verification utilities for monotonic behaviors

Best Practices Demonstrated

The test suite exemplifies testing best practices through comprehensive parameter coverage, isolated test cases, and thorough verification logic. Notable practices include:
  • Systematic parameter space exploration
  • Clear separation of test scenarios
  • Robust verification helper functions
  • Integration with distributed testing infrastructure
  • Proper test isolation and setup

microsoft/deepspeed

tests/unit/runtime/test_lr_schedulers.py

            
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0

# DeepSpeed Team

import torch
import deepspeed
import pytest
from unit.common import DistributedTest
from unit.simple_model import SimpleModel, random_dataloader
from deepspeed.runtime.lr_schedules import LR_RANGE_TEST, LR_RANGE_TEST_MIN_LR, LR_RANGE_TEST_STEP_RATE, LR_RANGE_TEST_STEP_SIZE, LR_RANGE_TEST_STAIRCASE
from deepspeed.runtime.lr_schedules import WARMUP_LR, WARMUP_MIN_LR, WARMUP_MAX_LR, WARMUP_NUM_STEPS, WARMUP_TYPE, WARMUP_LOG_RATE, WARMUP_LINEAR_RATE
from deepspeed.runtime.lr_schedules import ONE_CYCLE, CYCLE_MIN_LR, CYCLE_MAX_LR, CYCLE_FIRST_STEP_SIZE, DECAY_LR_RATE, DECAY_STEP_SIZE
from deepspeed.runtime.lr_schedules import CYCLE_MIN_MOM, CYCLE_MAX_MOM, DECAY_MOM_RATE
from deepspeed.runtime.lr_schedules import WARMUP_DECAY_LR, TOTAL_NUM_STEPS
from deepspeed.runtime.lr_schedules import WARMUP_COSINE_LR, WARMUP_MIN_RATIO, COS_MIN_RATIO


def _verify_continuous_decrease(values):
    for i in range(len(values) - 1):
        assert values[i] > values[i + 1]


def _verify_continuous_increase(values):
    for i in range(len(values) - 1):
        assert values[i] < values[i + 1]


def _verify_staircase_increase(values, step_size):
    num_values = len(values)
    for i in range(0, num_values, step_size):
        j = min(i + step_size, num_values)
        assert all([values[i] == v for v in values[i:j]])


@pytest.mark.parametrize("scheduler_type,params", [(WARMUP_LR, {}),
                                                   (WARMUP_DECAY_LR, {
                                                       WARMUP_NUM_STEPS: 10,
                                                       TOTAL_NUM_STEPS: 20
                                                   }), (WARMUP_COSINE_LR, {
                                                       WARMUP_NUM_STEPS: 10,
                                                       TOTAL_NUM_STEPS: 20
                                                   }), (ONE_CYCLE, {
                                                       CYCLE_MIN_LR: 0,
                                                       CYCLE_MAX_LR: 0.1
                                                   }), (LR_RANGE_TEST, {})])
class TestGetLrBeforeTrain(DistributedTest):
    world_size = 1

    def test(self, scheduler_type, params):
        config_dict = {
            "train_batch_size": 2,
            "steps_per_print": 1,
            "optimizer": {
                "type": "Adam",
                "params": {
                    "lr": 0.00015
                },
            },
            "scheduler": {
                "type": scheduler_type,
                "params": params
            },
            "gradient_clipping": 1.0
        }
        hidden_dim = 10

        model = SimpleModel(hidden_dim, empty_grad=False)
        model, _, _, lr_scheduler = deepspeed.initialize(config=config_dict,
                                                         model=model,
                                                         model_parameters=model.parameters())
        data_loader = random_dataloader(model=model,
                                        total_samples=50,
                                        hidden_dim=hidden_dim,
                                        device=model.device,
                                        dtype=torch.float)

        true_lrs = lr_scheduler.get_lr()
        for group, true_lr in zip(model.optimizer.param_groups, true_lrs):
            assert group['lr'] == true_lr, f"True lr {true_lr}, optimizer lr {group['lr']}"

        for n, batch in enumerate(data_loader):
            # get lr before training starts
            lr_scheduler.get_lr()
            loss = model(batch[0], batch[1])
            model.backward(loss)
            model.step()


@pytest.mark.parametrize("warmup_num_steps", [10, 15, 19, 33])
@pytest.mark.parametrize("warmup_type", [WARMUP_LOG_RATE, WARMUP_LINEAR_RATE])
class TestLrSchedule(DistributedTest):
    world_size = 1

    def test_lr_warmup_schedule(self, warmup_num_steps, warmup_type):
        config_dict = {
            "train_batch_size": 2,
            "steps_per_print": 1,
            "optimizer": {
                "type": "Adam",
                "params": {
                    "lr": 0.00015
                },
            },
            "scheduler": {
                "type": WARMUP_LR,
                "params": {
                    WARMUP_MIN_LR: 0.1,
                    WARMUP_MAX_LR: 0.2,
                    WARMUP_NUM_STEPS: warmup_num_steps,
                    WARMUP_TYPE: warmup_type,
                }
            },
            "gradient_clipping": 1.0
        }
        schedule_params = config_dict["scheduler"]["params"]
        total_num_steps = 2 * warmup_num_steps
        hidden_dim = 10

        model = SimpleModel(hidden_dim, empty_grad=False)
        model, _, _, lr_scheduler = deepspeed.initialize(config=config_dict,
                                                         model=model,
                                                         model_parameters=model.parameters())

        data_loader = random_dataloader(model=model,
                                        total_samples=total_num_steps * 2,
                                        hidden_dim=hidden_dim,
                                        device=model.device,
                                        dtype=torch.float)
        step_lrs = []
        for n, batch in enumerate(data_loader):
            loss = model(batch[0], batch[1])
            model.backward(loss)
            model.step()
            step_lrs.append(lr_scheduler.get_lr())

        # Verify initial lr
        assert step_lrs[0] == [schedule_params[WARMUP_MIN_LR]]

        # Verify warmup completion
        warmup_num_steps = schedule_params[WARMUP_NUM_STEPS]
        warmup_max_lr = [schedule_params[WARMUP_MAX_LR]]
        assert step_lrs[warmup_num_steps] == warmup_max_lr

        # Verify post-warmup completion
        assert all([warmup_max_lr == lr for lr in step_lrs[warmup_num_steps:]])

    def test_lr_warmup_decay_schedule(self, warmup_num_steps, warmup_type):
        config_dict = {
            "train_batch_size": 2,
            "steps_per_print": 1,
            "optimizer": {
                "type": "Adam",
                "params": {
                    "lr": 0.00015
                },
            },
            "scheduler": {
                "type": WARMUP_DECAY_LR,
                "params": {
                    WARMUP_MIN_LR: 0.1,
                    WARMUP_MAX_LR: 0.2,
                    WARMUP_NUM_STEPS: warmup_num_steps,
                    TOTAL_NUM_STEPS: warmup_num_steps * 2,
                    WARMUP_TYPE: warmup_type
                }
            },
            "gradient_clipping": 1.0
        }
        schedule_params = config_dict["scheduler"]["params"]
        total_num_steps = schedule_params[TOTAL_NUM_STEPS]
        hidden_dim = 10

        model = SimpleModel(hidden_dim, empty_grad=False)
        model, _, _, lr_scheduler = deepspeed.initialize(config=config_dict,
                                                         model=model,
                                                         model_parameters=model.parameters())

        data_loader = random_dataloader(model=model,
                                        total_samples=total_num_steps * 2,
                                        hidden_dim=hidden_dim,
                                        device=model.device,
                                        dtype=torch.float)
        step_lrs = []
        for n, batch in enumerate(data_loader):
            loss = model(batch[0], batch[1])
            model.backward(loss)
            model.step()
            step_lrs.append(lr_scheduler.get_lr())

        # Verify initial lr
        assert step_lrs[0] == [schedule_params[WARMUP_MIN_LR]]

        # Verify lr at warmup completion
        warmup_num_steps = schedule_params[WARMUP_NUM_STEPS]
        warmup_max_lr = [schedule_params[WARMUP_MAX_LR]]
        assert step_lrs[warmup_num_steps] == warmup_max_lr

        # Verify decay phase
        previous_lr = warmup_max_lr
        for lr in step_lrs[warmup_num_steps + 1:]:
            assert lr < previous_lr
            previous_lr = lr


@pytest.mark.parametrize("scheduler_type,params", [(WARMUP_LR, {}),
                                                   (WARMUP_DECAY_LR, {
                                                       WARMUP_NUM_STEPS: 5,
                                                       TOTAL_NUM_STEPS: 10
                                                   }),
                                                   (ONE_CYCLE, {
                                                       CYCLE_MIN_LR: 0,
                                                       CYCLE_MAX_LR: 0.1,
                                                       CYCLE_FIRST_STEP_SIZE: 5,
                                                       DECAY_STEP_SIZE: 5
                                                   }),
                                                   (LR_RANGE_TEST, {
                                                       LR_RANGE_TEST_MIN_LR: 1e-4,
                                                       LR_RANGE_TEST_STEP_SIZE: 1
                                                   })])
class TestSchedulerOptimizerParity(DistributedTest):
    world_size = 1

    def test(self, scheduler_type, params):
        config_dict = {
            "train_batch_size": 2,
            "steps_per_print": 1,
            "optimizer": {
                "type": "Adam",
                "params": {
                    "lr": 0.00015
                },
            },
            "scheduler": {
                "type": scheduler_type,
                "params": params
            },
            "gradient_clipping": 1.0
        }
        hidden_dim = 10

        model = SimpleModel(hidden_dim, empty_grad=False)
        model, _, _, lr_scheduler = deepspeed.initialize(config=config_dict,
                                                         model=model,
                                                         model_parameters=model.parameters())
        data_loader = random_dataloader(model=model,
                                        total_samples=50,
                                        hidden_dim=hidden_dim,
                                        device=model.device,
                                        dtype=torch.float)
        for n, batch in enumerate(data_loader):
            loss = model(batch[0], batch[1])
            model.backward(loss)
            model.step()
            assert lr_scheduler.get_lr() == model.get_lr()


@pytest.mark.parametrize("min_lr, step_rate, step_size, staircase",
                         [(1e-4, 1e-5, 1, True),
                          (1e-5, 1e-5, 1, False),
                          (1e-4, 1e-3, 10, True),
                          (1e-3, 1e-3, 10, False),
                          (1e-2, 1e-2, 19, True),
                          (1e-2, 1e-2, 19, False)
                           ])# yapf: disable
class TestLrRange(DistributedTest):
    world_size = 1

    def test(self, min_lr, step_rate, step_size, staircase):
        config_dict = {
            "train_batch_size": 2,
            "steps_per_print": 1,
            "optimizer": {
                "type": "Adam",
                "params": {
                    "lr": 0.00015
                },
            },
            "scheduler": {
                "type": LR_RANGE_TEST,
                "params": {
                    LR_RANGE_TEST_MIN_LR: min_lr,
                    LR_RANGE_TEST_STEP_RATE: step_rate,
                    LR_RANGE_TEST_STEP_SIZE: step_size,
                    LR_RANGE_TEST_STAIRCASE: staircase
                }
            },
            "gradient_clipping": 1.0
        }
        hidden_dim = 10

        model = SimpleModel(hidden_dim, empty_grad=False)
        model, _, _, lr_scheduler = deepspeed.initialize(config=config_dict,
                                                         model=model,
                                                         model_parameters=model.parameters())
        data_loader = random_dataloader(model=model,
                                        total_samples=max(50, step_size * 2),
                                        hidden_dim=hidden_dim,
                                        device=model.device,
                                        dtype=torch.float)

        step_lrs = []
        for _, batch in enumerate(data_loader):
            step_lrs.extend(lr_scheduler.get_lr())
            loss = model(batch[0], batch[1])
            model.backward(loss)
            model.step()

        # Verify starting lr
        assert step_lrs[0] == min_lr

        if staircase:
            # Verify staircase increasing lr
            _verify_staircase_increase(step_lrs, step_size)
        else:
            # Verify continuous increasing lr
            _verify_continuous_increase(step_lrs)


class TestOneCycle(DistributedTest):
    world_size = 1

    @pytest.mark.parametrize("min_lr, max_lr, decay_rate, cycle_step_size, decay_step_size",
                             [
                                 (1e-5, 1e-2, 1e-3, 10, 10),
                                 (1e-3, 1e-1, 0, 21, 21),
                                 (1e-5, 1e-2, 1e-3, 10, 10),
                                 (1e-3, 1e-1, 1e-1, 21, 21),
                                 (1e-5, 1e-1, 0, 10, 0),
                             ])  # yapf: disable
    def test_lr(self, min_lr, max_lr, decay_rate, cycle_step_size, decay_step_size):
        config_dict = {
            "train_batch_size": 2,
            "steps_per_print": 1,
            "optimizer": {
                "type": "Adam",
                "params": {
                    "lr": 0.00015
                },
            },
            "scheduler": {
                "type": ONE_CYCLE,
                "params": {
                    CYCLE_MIN_LR: min_lr,
                    CYCLE_MAX_LR: max_lr,
                    DECAY_LR_RATE: decay_rate,
                    CYCLE_FIRST_STEP_SIZE: cycle_step_size,
                    DECAY_STEP_SIZE: decay_step_size
                }
            },
            "gradient_clipping": 1.0
        }
        hidden_dim = 10

        model = SimpleModel(hidden_dim, empty_grad=False)
        model, _, _, lr_scheduler = deepspeed.initialize(config=config_dict,
                                                         model=model,
                                                         model_parameters=model.parameters())
        data_loader = random_dataloader(model=model,
                                        total_samples=max(50, cycle_step_size * 3),
                                        hidden_dim=hidden_dim,
                                        device=model.device,
                                        dtype=torch.float)

        step_lrs = []
        for _, batch in enumerate(data_loader):
            step_lrs.extend(lr_scheduler.get_lr())
            loss = model(batch[0], batch[1])
            model.backward(loss)
            model.step()

        # Verify starting lr
        assert step_lrs[0] == min_lr

        # Verify peak lr
        assert step_lrs[cycle_step_size] == max_lr

        # Verify increasing phase
        _verify_continuous_increase(step_lrs[:cycle_step_size])

        # Verify decreasing phase
        _verify_continuous_decrease(step_lrs[cycle_step_size:(cycle_step_size * 2)])

        # Verify decay phase
        if decay_rate > 0:
            _verify_continuous_decrease(step_lrs[(cycle_step_size * 2):])

    @pytest.mark.parametrize("min_mom, max_mom, decay_rate, step_size",
                             [
                                 (0.08, 0.09, 1e-3, 10),
                                 (0.08, 0.09, 0, 21),
                                 (0.08, 0.09, 1e-3, 10),
                                 (0.08, 0.09, 0, 21),
                             ]) # yapf: disable
    def test_mom(self, min_mom, max_mom, decay_rate, step_size):
        config_dict = {
            "train_batch_size": 2,
            "steps_per_print": 1,
            "optimizer": {
                "type": "Adam",
                "params": {
                    "lr": 0.00015
                },
            },
            "scheduler": {
                "type": ONE_CYCLE,
                "params": {
                    CYCLE_MIN_LR: 1e-3,
                    CYCLE_MAX_LR: 1e-2,
                    CYCLE_MIN_MOM: min_mom,
                    CYCLE_MAX_MOM: max_mom,
                    DECAY_MOM_RATE: decay_rate,
                    CYCLE_FIRST_STEP_SIZE: step_size,
                    DECAY_STEP_SIZE: step_size
                }
            },
            "gradient_clipping": 1.0
        }
        hidden_dim = 10

        model = SimpleModel(hidden_dim, empty_grad=False)
        model, _, _, lr_scheduler = deepspeed.initialize(config=config_dict,
                                                         model=model,
                                                         model_parameters=model.parameters())
        data_loader = random_dataloader(model=model,
                                        total_samples=max(50, step_size * 3),
                                        hidden_dim=hidden_dim,
                                        device=model.device,
                                        dtype=torch.float)

        step_moms = []
        for _, batch in enumerate(data_loader):
            step_moms.append(lr_scheduler.get_mom())
            loss = model(batch[0], batch[1])
            model.backward(loss)
            model.step()

        # Verify starting lr
        assert step_moms[0][0][0] == max_mom

        # Verify peak lr
        assert step_moms[step_size][0][0] == min_mom

        # Verify decreasing phase
        _verify_continuous_decrease(step_moms[:step_size])

        # Verify increasing phase
        _verify_continuous_increase(step_moms[step_size:(step_size * 2)])

        # Verify decay phase
        if decay_rate > 0:
            _verify_continuous_increase(step_moms[(step_size * 2):])


class TestWarmupCosineLR(DistributedTest):
    world_size = 1

    @pytest.mark.parametrize("total_num_steps, warmup_num_steps, cos_min_ratio, warmup_min_ratio",
                             [
                                 (100, 10, 0.1, 0.2),
                                 (200, 20, 0.1, 0.2),
                                 (500, 30, 0.0, 0.2),
                                 (600, 300, 0.1, 0.0),
                                 (600, 550, 0.0, 0.0),
                             ])  # yapf: disable
    def test_lr(self, total_num_steps, warmup_num_steps, cos_min_ratio, warmup_min_ratio):
        opt_lr = 0.0015
        config_dict = {
            "train_batch_size": 2,
            "steps_per_print": 1,
            "optimizer": {
                "type": "Adam",
                "params": {
                    "lr": opt_lr
                },
            },
            "scheduler": {
                "type": WARMUP_COSINE_LR,
                "params": {
                    TOTAL_NUM_STEPS: total_num_steps,
                    WARMUP_MIN_RATIO: warmup_min_ratio,
                    WARMUP_NUM_STEPS: warmup_num_steps,
                    COS_MIN_RATIO: cos_min_ratio,
                }
            },
            "gradient_clipping": 1.0
        }
        hidden_dim = 10

        model = SimpleModel(hidden_dim, empty_grad=False)
        model, _, _, lr_scheduler = deepspeed.initialize(config=config_dict,
                                                         model=model,
                                                         model_parameters=model.parameters())
        data_loader = random_dataloader(model=model,
                                        total_samples=max(50, total_num_steps * 3),
                                        hidden_dim=hidden_dim,
                                        device=model.device,
                                        dtype=torch.float)

        step_lrs = []
        for _, batch in enumerate(data_loader):
            loss = model(batch[0], batch[1])
            model.backward(loss)
            model.step()
            step_lrs.extend(lr_scheduler.get_lr())

        # Verify starting lr
        assert abs(step_lrs[0] - opt_lr * warmup_min_ratio) < 1e-7

        # Verify peak lr
        assert abs(step_lrs[warmup_num_steps - 1] - opt_lr) < 1e-7

        # Verify end lr
        assert abs(step_lrs[total_num_steps - 1] - opt_lr * cos_min_ratio) < 1e-7

        # Verify increasing phase
        _verify_continuous_increase(step_lrs[:warmup_num_steps])

        # Verify decreasing phase
        _verify_continuous_decrease(step_lrs[warmup_num_steps:total_num_steps])