Back to Repositories

Testing Memory Block Allocation Management in DeepSpeed

This test suite validates the BlockedAllocator implementation in DeepSpeed’s inference V2 module, focusing on memory block allocation and deallocation functionality. The tests ensure proper memory management, block tracking, and error handling for the allocator component.

Test Coverage Overview

The test suite provides comprehensive coverage of BlockedAllocator functionality:
  • Initialization validation with boundary checks
  • Basic allocation and deallocation operations
  • Memory block tracking and management
  • Error handling for invalid operations
  • Long-running stability tests with random allocation patterns

Implementation Analysis

The testing approach implements systematic validation of memory management operations:
  • Parametrized tests for error conditions
  • Block allocation tracking and uniqueness verification
  • Memory leak detection through allocation/deallocation balance checks
  • Stress testing with randomized operations

Technical Details

Testing infrastructure and tools:
  • PyTest framework with parametrization
  • PyTorch tensor operations for block management
  • Custom fixtures for allocator initialization
  • Random number generation for stress testing
  • Assertion-based validation for memory integrity

Best Practices Demonstrated

The test suite exemplifies high-quality testing practices:
  • Comprehensive edge case coverage
  • Systematic error condition testing
  • Long-running stability verification
  • Clear test organization and documentation
  • Efficient use of parametrized testing

microsoft/deepspeed

tests/unit/inference/v2/ragged/test_blocked_allocator.py

            
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0

# DeepSpeed Team

import random
from typing import List

import pytest
import torch

from deepspeed.inference.v2.ragged.blocked_allocator import BlockedAllocator


@pytest.mark.inference_v2
@pytest.mark.parametrize('bad_size', [0, -1])
def test_bad_initialization(bad_size: int) -> None:
    with pytest.raises(ValueError):
        BlockedAllocator(bad_size)


@pytest.mark.inference_v2
def test_allocation() -> None:

    allocator = BlockedAllocator(16)

    a1 = allocator.allocate(4)
    assert a1.numel() == 4
    assert allocator.free_blocks == 12

    a2_allocs = []
    for i in range(3):
        a2_allocs.append(allocator.allocate(2))
        assert allocator.free_blocks == 12 - (i + 1) * 2

    a3 = allocator.allocate(6)
    assert a3.numel() == 6

    assert allocator.free_blocks == 0

    # Test that we can't allocate more blocks than we have.
    with pytest.raises(ValueError):
        allocator.allocate(1)

    all_vals = torch.cat([a1, *a2_allocs, a3], dim=0)
    unique_vals = torch.unique(all_vals, sorted=False)
    assert unique_vals.numel() == all_vals.numel()


@pytest.mark.inference_v2
def test_too_large_allocation():
    allocator = BlockedAllocator(16)

    with pytest.raises(ValueError):
        allocator.allocate(17)


@pytest.mark.inference_v2
def test_deallocation() -> None:
    allocator = BlockedAllocator(16)

    # Allocate
    all_blocks = allocator.allocate(16)
    assert allocator.free_blocks == 0

    # Deallocate all blocks
    allocator.free(all_blocks)
    assert allocator.free_blocks == 16

    # Get all the blocks again
    all_blocks = allocator.allocate(16)

    # Deallocate in chunks
    c1 = all_blocks[:4]
    c2 = all_blocks[4:8]

    allocator.free(c1)
    assert allocator.free_blocks == 4

    allocator.free(c2)
    assert allocator.free_blocks == 8

    with pytest.raises(ValueError):
        allocator.free(c1)

    with pytest.raises(ValueError):
        allocator.free(c2)


@pytest.mark.inference_v2
@pytest.mark.parametrize('index', [-1, 2])
def test_invalid_dealloc_indices(index: int):
    allocator = BlockedAllocator(1)

    with pytest.raises(ValueError):
        allocator.free(torch.tensor([index]))


@pytest.mark.inference_v2
@pytest.mark.parametrize('index', [-1, 2])
def test_invalid_alloc_indices(index: int):
    allocator = BlockedAllocator(1)
    allocator.allocate(1)

    to_free = [0, index]

    with pytest.raises(ValueError):
        allocator.free(torch.tensor(to_free))

    # Block 0 should not be freed if passed with an invalid index.
    assert allocator.free_blocks == 0

    allocator.free(torch.tensor([0]))
    assert allocator.free_blocks == 1


@pytest.mark.inference_v2
@pytest.mark.parametrize('test_iters', [8192])
def test_long_running_allocation(test_iters: int) -> None:
    """
    Evaluate the stability of the allocator over a longer sequence of allocations/deallocations.
    """
    TOTAL_BLOCKS = 128

    allocator = BlockedAllocator(TOTAL_BLOCKS)

    def validate_uniqueness(all_blocks: List[torch.Tensor]) -> None:
        all_vals = torch.cat(all_blocks, dim=0)
        assert all_vals.numel() <= TOTAL_BLOCKS

        unique_vals = torch.unique(all_vals, sorted=False)
        assert unique_vals.numel() == all_vals.numel()

    all_allocs: List[torch.Tensor] = []
    num_allocs = 0
    num_frees = 0
    num_blocks_allocated = 0
    num_blocks_freed = 0

    for _ in range(test_iters):
        decision = random.randint(0, 1)

        if decision == 0:
            blocks_to_allocate = random.randint(1, 24)
            if blocks_to_allocate > allocator.free_blocks:
                with pytest.raises(ValueError):
                    allocator.allocate(blocks_to_allocate)
            else:
                all_allocs.append(allocator.allocate(blocks_to_allocate))
                num_allocs += 1
                num_blocks_allocated += blocks_to_allocate
        else:
            if len(all_allocs) > 0:
                idx = random.randint(0, len(all_allocs) - 1)
                allocator.free(all_allocs[idx])

                num_frees += 1
                num_blocks_freed += all_allocs[idx].numel()

                del all_allocs[idx]

        if len(all_allocs) > 0:
            validate_uniqueness(all_allocs)

    assert num_allocs == num_frees + len(all_allocs)
    assert num_blocks_allocated == num_blocks_freed + (TOTAL_BLOCKS - allocator.free_blocks)