Back to Repositories

Testing BatchEventProcessor Size Limits in LMAX-Disruptor

This test suite validates the batch size limiting functionality in the LMAX Disruptor’s event processing system. It focuses on ensuring the BatchEventProcessor correctly handles maximum batch sizes and properly announces batch metrics during event processing.

Test Coverage Overview

The test suite comprehensively covers batch processing behavior in the Disruptor framework:
  • Verification of configured maximum batch size limits
  • Batch size announcement validation
  • Queue depth monitoring
  • Event sequence tracking across multiple batches

Implementation Analysis

The testing approach utilizes JUnit 5 with a custom BatchLimitRecordingHandler to track event processing:
  • Setup includes RingBuffer initialization with single producer
  • Uses CountDownLatch for synchronization
  • Implements event recording through sequence tracking
  • Validates batch metrics through list comparisons

Technical Details

Testing infrastructure includes:
  • JUnit Jupiter test framework
  • RingBuffer with 16 slots configuration
  • Custom BatchEventProcessorBuilder with MAX_BATCH_SIZE of 3
  • ThreadLocal execution environment
  • StubEvent for event simulation

Best Practices Demonstrated

The test suite exemplifies several testing best practices:
  • Proper test lifecycle management with @BeforeEach and @AfterEach
  • Clean thread handling and resource cleanup
  • Clear separation of setup, execution, and verification phases
  • Comprehensive batch processing validation
  • Effective use of custom event handlers for detailed verification

lmax-exchange/disruptor

src/test/java/com/lmax/disruptor/MaxBatchSizeEventProcessorTest.java

            
/*
 * Copyright 2011 LMAX Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.lmax.disruptor;

import com.lmax.disruptor.support.StubEvent;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import static com.lmax.disruptor.RingBuffer.createSingleProducer;
import static org.junit.jupiter.api.Assertions.assertEquals;

public final class MaxBatchSizeEventProcessorTest
{
    public static final int MAX_BATCH_SIZE = 3;
    public static final int PUBLISH_COUNT = 5;
    private final RingBuffer<StubEvent> ringBuffer = createSingleProducer(StubEvent.EVENT_FACTORY, 16);
    private final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
    private CountDownLatch countDownLatch;
    private BatchEventProcessor<StubEvent> batchEventProcessor;
    private Thread thread;
    private BatchLimitRecordingHandler eventHandler;

    @BeforeEach
    void setUp()
    {
        countDownLatch = new CountDownLatch(PUBLISH_COUNT);
        eventHandler = new BatchLimitRecordingHandler(countDownLatch);

        batchEventProcessor = new BatchEventProcessorBuilder()
                .setMaxBatchSize(MAX_BATCH_SIZE)
                .build(ringBuffer, this.sequenceBarrier, eventHandler);

        ringBuffer.addGatingSequences(batchEventProcessor.getSequence());

        thread = new Thread(batchEventProcessor);
        thread.start();
    }

    @Test
    public void shouldLimitTheBatchToConfiguredMaxBatchSize() throws Exception
    {
        publishEvents();

        assertEquals(eventHandler.batchedSequences, Arrays.asList(Arrays.asList(0L, 1L, 2L), Arrays.asList(3L, 4L)));
    }

    @Test
    public void shouldAnnounceBatchSizeAndQueueDepthAtTheStartOfBatch() throws Exception
    {
        publishEvents();

        assertEquals(eventHandler.announcedBatchSizes, Arrays.asList(3L, 2L));
        assertEquals(eventHandler.announcedQueueDepths, Arrays.asList(5L, 2L));
    }

    @AfterEach
    void tearDown() throws InterruptedException
    {
        batchEventProcessor.halt();
        thread.join();
    }

    private void publishEvents() throws InterruptedException
    {
        long sequence = 0;
        for (int i = 0; i < PUBLISH_COUNT; i++)
        {
            sequence = ringBuffer.next();
        }
        ringBuffer.publish(sequence);

        //Wait for consumer to process all events
        countDownLatch.await();
    }

    private static class BatchLimitRecordingHandler implements EventHandler<StubEvent>
    {
        public final List<List<Long>> batchedSequences = new ArrayList<>();
        private List<Long> currentSequences;
        private final CountDownLatch countDownLatch;
        private final List<Long> announcedBatchSizes = new ArrayList<>();
        private final List<Long> announcedQueueDepths = new ArrayList<>();

        BatchLimitRecordingHandler(final CountDownLatch countDownLatch)
        {
            this.countDownLatch = countDownLatch;
        }

        @Override
        public void onEvent(final StubEvent event, final long sequence, final boolean endOfBatch) throws Exception
        {
            currentSequences.add(sequence);
            if (endOfBatch)
            {
                batchedSequences.add(currentSequences);
                currentSequences = null;
            }

            countDownLatch.countDown();
        }

        @Override
        public void onBatchStart(final long batchSize, final long queueDepth)
        {
            currentSequences = new ArrayList<>();
            announcedBatchSizes.add(batchSize);
            announcedQueueDepths.add(queueDepth);
        }
    }
}