Back to Repositories

Testing Parallel Event Batch Processing in LMAX-Disruptor

This test suite validates the batching functionality in the LMAX Disruptor’s event processing system. It examines both single and multi-producer scenarios, ensuring proper event handling and batch processing capabilities for high-performance applications.

Test Coverage Overview

The test suite provides comprehensive coverage of the Disruptor’s batching mechanism.

Key areas tested include:
  • Parallel event processing with multiple handlers
  • Batch size management and processing
  • Event sequence tracking and validation
  • Both single and multi-producer scenarios
  • End-of-batch condition handling

Implementation Analysis

The test implementation uses JUnit 5’s parameterized testing approach to validate both producer types. It employs a custom ParallelEventHandler class that processes events based on sequence masks and ordinals, simulating real-world batch processing scenarios.

The testing pattern demonstrates concurrent event handling with precise control over batch sizes and processing delays.

Technical Details

Testing tools and configuration:
  • JUnit Jupiter for parameterized testing
  • Disruptor RingBuffer configuration with 2048 buffer size
  • SleepingWaitStrategy for event processing
  • DaemonThreadFactory for thread management
  • Custom EventTranslator for event publishing
  • Parallel event handlers with configurable batch sizes

Best Practices Demonstrated

The test exhibits several testing best practices for concurrent systems.

Notable practices include:
  • Parameterized test cases for different producer types
  • Proper thread synchronization and waiting mechanisms
  • Clear separation of concerns between event handling and verification
  • Comprehensive assertion checks for both event counts and processed values
  • Efficient batch processing validation

lmax-exchange/disruptor

src/test/java/com/lmax/disruptor/BatchingTest.java

            
package com.lmax.disruptor;

import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.lmax.disruptor.support.LongEvent;
import com.lmax.disruptor.util.DaemonThreadFactory;
import org.hamcrest.CoreMatchers;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.concurrent.locks.LockSupport;
import java.util.stream.Stream;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.params.provider.Arguments.arguments;

public class BatchingTest
{
    public static Stream<Arguments> generateData()
    {
        return Stream.of(arguments(ProducerType.MULTI), arguments(ProducerType.SINGLE));
    }

    private static class ParallelEventHandler implements EventHandler<LongEvent>
    {
        private final long mask;
        private final long ordinal;
        private final int batchSize = 10;

        private long eventCount;
        private long batchCount;
        private long publishedValue;
        private long tempValue;
        private volatile long processed;

        ParallelEventHandler(final long mask, final long ordinal)
        {
            this.mask = mask;
            this.ordinal = ordinal;
        }

        @Override
        public void onEvent(final LongEvent event, final long sequence, final boolean endOfBatch) throws Exception
        {
            if ((sequence & mask) == ordinal)
            {
                eventCount++;
                tempValue = event.get();
            }

            if (endOfBatch || ++batchCount >= batchSize)
            {
                publishedValue = tempValue;
                batchCount = 0;
            }
            else
            {
                LockSupport.parkNanos(1);
            }

            processed = sequence;
        }
    }

    @SuppressWarnings("unchecked")
    @ParameterizedTest
    @MethodSource("generateData")
    public void shouldBatch(final ProducerType producerType) throws Exception
    {
        Disruptor<LongEvent> d = new Disruptor<>(
                LongEvent.FACTORY, 2048, DaemonThreadFactory.INSTANCE,
                producerType, new SleepingWaitStrategy());

        ParallelEventHandler handler1 = new ParallelEventHandler(1, 0);
        ParallelEventHandler handler2 = new ParallelEventHandler(1, 1);

        d.handleEventsWith(handler1, handler2);

        RingBuffer<LongEvent> buffer = d.start();

        EventTranslator<LongEvent> translator = (event, sequence) -> event.set(sequence);

        int eventCount = 10000;
        for (int i = 0; i < eventCount; i++)
        {
            buffer.publishEvent(translator);
        }

        while (handler1.processed != eventCount - 1 ||
            handler2.processed != eventCount - 1)
        {
            Thread.sleep(1);
        }

        assertThat(handler1.publishedValue, CoreMatchers.is((long) eventCount - 2));
        assertThat(handler1.eventCount, CoreMatchers.is((long) eventCount / 2));
        assertThat(handler2.publishedValue, CoreMatchers.is((long) eventCount - 1));
        assertThat(handler2.eventCount, CoreMatchers.is((long) eventCount / 2));
    }
}