Back to Repositories

Testing Pipeline Parallelism Splitting Strategies in ColossalAI

This test suite validates pipeline parallelism functionality in ColossalAI by testing different splitting strategies on a simple MLP model. It ensures that model outputs remain consistent before and after pipeline transformations.

Test Coverage Overview

The test suite covers three key pipeline splitting strategies: balanced split, balanced split v2, and uniform split.

Key areas tested include:
  • Model output consistency preservation
  • Symbolic tracing functionality
  • Multiple pipeline splitting algorithms
  • Integration with torch.fx framework

Implementation Analysis

The testing approach uses a helper function that compares model outputs before and after pipeline transformations. It leverages PyTorch’s symbolic tracing to create graph representations and applies different splitting strategies.

Key patterns include:
  • Modular test helper design
  • Consistent input/output validation
  • Automated cache clearing between tests
  • Parameterized model dimensions

Technical Details

Testing infrastructure includes:
  • PyTorch FX for symbolic tracing
  • Custom MLP model with 4 linear layers
  • Pipeline size of 2
  • Model dimension of 16
  • Batch size of 8
  • ColossalAI pipeline splitting passes

Best Practices Demonstrated

The test implementation showcases several testing best practices.

Notable elements include:
  • Isolation of test cases using cache clearing
  • Reusable test helper functions
  • Consistent assertion patterns
  • Clear separation of test setup and execution
  • Parameterized test configurations

hpcaitech/colossalai

tests/test_fx/test_pipeline_passes.py

            
import torch
from torch.fx import symbolic_trace

from colossalai.fx.passes.adding_split_node_pass import (
    balanced_split_pass,
    balanced_split_pass_v2,
    split_with_split_nodes_pass,
    uniform_split_pass,
)
from colossalai.testing import clear_cache_before_run

MODEL_DIM = 16
BATCH_SIZE = 8
PIPELINE_SIZE = 2


class MLP(torch.nn.Module):
    def __init__(self, dim: int):
        super().__init__()
        self.linear1 = torch.nn.Linear(dim, dim)
        self.linear2 = torch.nn.Linear(dim, dim)
        self.linear3 = torch.nn.Linear(dim, dim)
        self.linear4 = torch.nn.Linear(dim, dim)

    def forward(self, x):
        x = self.linear1(x)
        x = self.linear2(x)
        x = self.linear3(x)
        x = self.linear4(x)
        return x


def pipeline_pass_test_helper(model, data, pass_func):
    origin_output = model(data)
    symbolic_traced = symbolic_trace(model)
    annotated_model = pass_func(symbolic_traced, PIPELINE_SIZE)
    split_model, split_submodules = split_with_split_nodes_pass(annotated_model)
    output = split_model(data)
    assert output.equal(origin_output)


@clear_cache_before_run()
def test_pipeline_passes():
    model = MLP(MODEL_DIM)
    data = torch.rand(BATCH_SIZE, MODEL_DIM)
    pipeline_pass_test_helper(model, data, balanced_split_pass)
    pipeline_pass_test_helper(model, data, balanced_split_pass_v2)
    pipeline_pass_test_helper(model, data, uniform_split_pass)


if __name__ == "__main__":
    test_pipeline_passes()