Back to Repositories

Testing Data Sampling Strategies in Coqui-TTS

This test suite validates data sampling functionality in the TTS library, focusing on balanced sampling across languages, speakers, and audio lengths. It ensures proper batch creation and data distribution for training text-to-speech models.

Test Coverage Overview

The test suite provides comprehensive coverage of various sampling strategies:
  • Language balancing across English and Portuguese samples
  • Speaker balancing between different voice profiles
  • Audio length balancing across different duration ranges
  • Perfect batch sampling with controlled class distribution
  • Bucket batch sampling for efficient text length grouping

Implementation Analysis

The testing approach employs PyTorch’s sampling utilities and custom implementations:
  • Uses WeightedRandomSampler for balanced distribution
  • Implements PerfectBatchSampler for exact class ratios
  • Utilizes BucketBatchSampler for length-based batching
  • Validates both shuffled and sequential sampling modes

Technical Details

Testing infrastructure includes:
  • PyTorch’s data sampling frameworks
  • Custom balancer weight calculations
  • Fixed random seeds for reproducibility
  • Dataset configurations for multiple languages
  • Mock audio length generation for testing

Best Practices Demonstrated

The test suite exemplifies robust testing practices:
  • Systematic validation of balance ratios
  • Edge case handling with random audio lengths
  • Comprehensive batch composition verification
  • Clear separation of test cases by sampling strategy
  • Proper test isolation and setup

coqui-ai/tts

tests/data_tests/test_samplers.py

            
import functools
import random
import unittest

import torch

from TTS.config.shared_configs import BaseDatasetConfig
from TTS.tts.datasets import load_tts_samples
from TTS.tts.utils.data import get_length_balancer_weights
from TTS.tts.utils.languages import get_language_balancer_weights
from TTS.tts.utils.speakers import get_speaker_balancer_weights
from TTS.utils.samplers import BucketBatchSampler, PerfectBatchSampler

# Fixing random state to avoid random fails
torch.manual_seed(0)

dataset_config_en = BaseDatasetConfig(
    formatter="ljspeech",
    meta_file_train="metadata.csv",
    meta_file_val="metadata.csv",
    path="tests/data/ljspeech",
    language="en",
)

dataset_config_pt = BaseDatasetConfig(
    formatter="ljspeech",
    meta_file_train="metadata.csv",
    meta_file_val="metadata.csv",
    path="tests/data/ljspeech",
    language="pt-br",
)

# Adding the EN samples twice to create a language unbalanced dataset
train_samples, eval_samples = load_tts_samples(
    [dataset_config_en, dataset_config_en, dataset_config_pt], eval_split=True
)

# gerenate a speaker unbalanced dataset
for i, sample in enumerate(train_samples):
    if i < 5:
        sample["speaker_name"] = "ljspeech-0"
    else:
        sample["speaker_name"] = "ljspeech-1"


def is_balanced(lang_1, lang_2):
    return 0.85 < lang_1 / lang_2 < 1.2


class TestSamplers(unittest.TestCase):
    def test_language_random_sampler(self):  # pylint: disable=no-self-use
        random_sampler = torch.utils.data.RandomSampler(train_samples)
        ids = functools.reduce(lambda a, b: a + b, [list(random_sampler) for i in range(100)])
        en, pt = 0, 0
        for index in ids:
            if train_samples[index]["language"] == "en":
                en += 1
            else:
                pt += 1

        assert not is_balanced(en, pt), "Random sampler is supposed to be unbalanced"

    def test_language_weighted_random_sampler(self):  # pylint: disable=no-self-use
        weighted_sampler = torch.utils.data.sampler.WeightedRandomSampler(
            get_language_balancer_weights(train_samples), len(train_samples)
        )
        ids = functools.reduce(lambda a, b: a + b, [list(weighted_sampler) for i in range(100)])
        en, pt = 0, 0
        for index in ids:
            if train_samples[index]["language"] == "en":
                en += 1
            else:
                pt += 1

        assert is_balanced(en, pt), "Language Weighted sampler is supposed to be balanced"

    def test_speaker_weighted_random_sampler(self):  # pylint: disable=no-self-use
        weighted_sampler = torch.utils.data.sampler.WeightedRandomSampler(
            get_speaker_balancer_weights(train_samples), len(train_samples)
        )
        ids = functools.reduce(lambda a, b: a + b, [list(weighted_sampler) for i in range(100)])
        spk1, spk2 = 0, 0
        for index in ids:
            if train_samples[index]["speaker_name"] == "ljspeech-0":
                spk1 += 1
            else:
                spk2 += 1

        assert is_balanced(spk1, spk2), "Speaker Weighted sampler is supposed to be balanced"

    def test_perfect_sampler(self):  # pylint: disable=no-self-use
        classes = set()
        for item in train_samples:
            classes.add(item["speaker_name"])

        sampler = PerfectBatchSampler(
            train_samples,
            classes,
            batch_size=2 * 3,  # total batch size
            num_classes_in_batch=2,
            label_key="speaker_name",
            shuffle=False,
            drop_last=True,
        )
        batchs = functools.reduce(lambda a, b: a + b, [list(sampler) for i in range(100)])
        for batch in batchs:
            spk1, spk2 = 0, 0
            # for in each batch
            for index in batch:
                if train_samples[index]["speaker_name"] == "ljspeech-0":
                    spk1 += 1
                else:
                    spk2 += 1
            assert spk1 == spk2, "PerfectBatchSampler is supposed to be perfectly balanced"

    def test_perfect_sampler_shuffle(self):  # pylint: disable=no-self-use
        classes = set()
        for item in train_samples:
            classes.add(item["speaker_name"])

        sampler = PerfectBatchSampler(
            train_samples,
            classes,
            batch_size=2 * 3,  # total batch size
            num_classes_in_batch=2,
            label_key="speaker_name",
            shuffle=True,
            drop_last=False,
        )
        batchs = functools.reduce(lambda a, b: a + b, [list(sampler) for i in range(100)])
        for batch in batchs:
            spk1, spk2 = 0, 0
            # for in each batch
            for index in batch:
                if train_samples[index]["speaker_name"] == "ljspeech-0":
                    spk1 += 1
                else:
                    spk2 += 1
            assert spk1 == spk2, "PerfectBatchSampler is supposed to be perfectly balanced"

    def test_length_weighted_random_sampler(self):  # pylint: disable=no-self-use
        for _ in range(1000):
            # gerenate a lenght unbalanced dataset with random max/min audio lenght
            min_audio = random.randrange(1, 22050)
            max_audio = random.randrange(44100, 220500)
            for idx, item in enumerate(train_samples):
                # increase the diversity of durations
                random_increase = random.randrange(100, 1000)
                if idx < 5:
                    item["audio_length"] = min_audio + random_increase
                else:
                    item["audio_length"] = max_audio + random_increase

            weighted_sampler = torch.utils.data.sampler.WeightedRandomSampler(
                get_length_balancer_weights(train_samples, num_buckets=2), len(train_samples)
            )
            ids = functools.reduce(lambda a, b: a + b, [list(weighted_sampler) for i in range(100)])
            len1, len2 = 0, 0
            for index in ids:
                if train_samples[index]["audio_length"] < max_audio:
                    len1 += 1
                else:
                    len2 += 1
            assert is_balanced(len1, len2), "Length Weighted sampler is supposed to be balanced"

    def test_bucket_batch_sampler(self):
        bucket_size_multiplier = 2
        sampler = range(len(train_samples))
        sampler = BucketBatchSampler(
            sampler,
            data=train_samples,
            batch_size=7,
            drop_last=True,
            sort_key=lambda x: len(x["text"]),
            bucket_size_multiplier=bucket_size_multiplier,
        )

        # check if the samples are sorted by text lenght whuile bucketing
        min_text_len_in_bucket = 0
        bucket_items = []
        for batch_idx, batch in enumerate(list(sampler)):
            if (batch_idx + 1) % bucket_size_multiplier == 0:
                for bucket_item in bucket_items:
                    self.assertLessEqual(min_text_len_in_bucket, len(train_samples[bucket_item]["text"]))
                    min_text_len_in_bucket = len(train_samples[bucket_item]["text"])
                min_text_len_in_bucket = 0
                bucket_items = []
            else:
                bucket_items += batch

        # check sampler length
        self.assertEqual(len(sampler), len(train_samples) // 7)