Back to Repositories

Testing Distributed Temporary Directory Management in ColossalAI

This test suite validates distributed file system operations in Colossal-AI, focusing on shared temporary directory management across multiple processes. It ensures proper synchronization and cleanup of temporary resources in distributed computing environments.

Test Coverage Overview

The test suite covers distributed temporary directory management functionality.

  • Tests shared tempdir creation and cleanup across ranks
  • Verifies proper synchronization using dist.broadcast_object_list
  • Validates barrier implementation for process coordination
  • Handles edge cases for different process ranks

Implementation Analysis

The testing approach uses contextmanager patterns to manage distributed resources.

Key implementation aspects include:
  • Rank-based conditional temporary directory creation
  • Object broadcasting for directory path synchronization
  • Barrier synchronization for coordinated cleanup
  • Null context handling for non-zero ranks

Technical Details

Testing infrastructure utilizes:

  • PyTorch distributed (torch.distributed)
  • Python’s tempfile module
  • Contextlib for context management
  • Type hints for code clarity
  • Exception handling for cleanup coordination

Best Practices Demonstrated

The test implementation showcases robust distributed testing practices.

  • Proper resource cleanup using context managers
  • Rank-aware process coordination
  • Type-safe implementation with annotations
  • Efficient cross-process synchronization
  • Deterministic cleanup sequence

hpcaitech/colossalai

tests/test_checkpoint_io/utils.py

            
import tempfile
from contextlib import contextmanager, nullcontext
from typing import Iterator

import torch.distributed as dist


@contextmanager
def shared_tempdir() -> Iterator[str]:
    """
    A temporary directory that is shared across all processes.
    """
    ctx_fn = tempfile.TemporaryDirectory if dist.get_rank() == 0 else nullcontext
    with ctx_fn() as tempdir:
        try:
            obj = [tempdir]
            dist.broadcast_object_list(obj, src=0)
            tempdir = obj[0]  # use the same directory on all ranks
            yield tempdir
        finally:
            dist.barrier()