Back to Repositories

Testing High-Concurrency Ring Buffer Operations in LMAX-Disruptor

This stress test suite validates the LMAX Disruptor’s performance and reliability under high-concurrency scenarios. It focuses on testing the ring buffer implementation with multiple producers and handlers while verifying data consistency and thread safety.

Test Coverage Overview

The test suite provides comprehensive coverage of the Disruptor’s concurrent operations.

Key areas tested include:
  • Multi-threaded producer/consumer scenarios
  • Ring buffer operations under heavy load
  • Thread synchronization and coordination
  • Data consistency across concurrent operations
Edge cases cover maximum thread utilization and boundary conditions for the ring buffer.

Implementation Analysis

The testing approach employs JUnit to validate concurrent processing capabilities of the Disruptor framework. The implementation uses a combination of CyclicBarrier for thread coordination and CountDownLatch for completion synchronization.

Key patterns include:
  • Producer-Consumer pattern implementation
  • Event-driven architecture testing
  • Concurrent operation verification

Technical Details

Testing infrastructure includes:
  • JUnit 5 testing framework
  • ExecutorService for thread management
  • BusySpinWaitStrategy for wait strategy
  • Custom event handlers and publishers
  • Configurable thread count based on available processors

Best Practices Demonstrated

The test suite exemplifies high-quality concurrent testing practices.

Notable features include:
  • Proper thread synchronization mechanisms
  • Comprehensive error handling and verification
  • Scalable test design based on system resources
  • Clean separation of test components
  • Effective use of assertion patterns

lmax-exchange/disruptor

src/test/java/com/lmax/disruptor/DisruptorStressTest.java

            
package com.lmax.disruptor;

import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.lmax.disruptor.util.DaemonThreadFactory;
import org.junit.jupiter.api.Test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.LockSupport;

import static java.lang.Math.max;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.MatcherAssert.assertThat;

public class DisruptorStressTest
{
    private final ExecutorService executor = Executors.newCachedThreadPool();

    @Test
    public void shouldHandleLotsOfThreads() throws Exception
    {
        Disruptor<TestEvent> disruptor = new Disruptor<>(
                TestEvent.FACTORY, 1 << 16, DaemonThreadFactory.INSTANCE,
                ProducerType.MULTI, new BusySpinWaitStrategy());
        RingBuffer<TestEvent> ringBuffer = disruptor.getRingBuffer();
        disruptor.setDefaultExceptionHandler(new FatalExceptionHandler());

        int threads = max(1, Runtime.getRuntime().availableProcessors() / 2);

        int iterations = 200000;
        int publisherCount = threads;
        int handlerCount = threads;

        CyclicBarrier barrier = new CyclicBarrier(publisherCount);
        CountDownLatch latch = new CountDownLatch(publisherCount);

        TestEventHandler[] handlers = initialise(disruptor, new TestEventHandler[handlerCount]);
        Publisher[] publishers = initialise(new Publisher[publisherCount], ringBuffer, iterations, barrier, latch);

        disruptor.start();

        for (Publisher publisher : publishers)
        {
            executor.execute(publisher);
        }

        latch.await();
        while (ringBuffer.getCursor() < (iterations - 1))
        {
            LockSupport.parkNanos(1);
        }

        disruptor.shutdown();

        for (Publisher publisher : publishers)
        {
            assertThat(publisher.failed, is(false));
        }

        for (TestEventHandler handler : handlers)
        {
            assertThat(handler.messagesSeen, is(not(0)));
            assertThat(handler.failureCount, is(0));
        }
    }

    private Publisher[] initialise(
            final Publisher[] publishers, final RingBuffer<TestEvent> buffer,
            final int messageCount, final CyclicBarrier barrier, final CountDownLatch latch)
    {
        for (int i = 0; i < publishers.length; i++)
        {
            publishers[i] = new Publisher(buffer, messageCount, barrier, latch);
        }

        return publishers;
    }

    @SuppressWarnings("unchecked")
    private TestEventHandler[] initialise(final Disruptor<TestEvent> disruptor, final TestEventHandler[] testEventHandlers)
    {
        for (int i = 0; i < testEventHandlers.length; i++)
        {
            TestEventHandler handler = new TestEventHandler();
            disruptor.handleEventsWith(handler);
            testEventHandlers[i] = handler;
        }

        return testEventHandlers;
    }

    private static class TestEventHandler implements EventHandler<TestEvent>
    {
        public int failureCount = 0;
        public int messagesSeen = 0;

        TestEventHandler()
        {
        }

        @Override
        public void onEvent(final TestEvent event, final long sequence, final boolean endOfBatch) throws Exception
        {
            if (event.sequence != sequence ||
                event.a != sequence + 13 ||
                event.b != sequence - 7 ||
                !("wibble-" + sequence).equals(event.s))
            {
                failureCount++;
            }

            messagesSeen++;
        }
    }

    private static class Publisher implements Runnable
    {
        private final RingBuffer<TestEvent> ringBuffer;
        private final CyclicBarrier barrier;
        private final int iterations;
        private final CountDownLatch shutdownLatch;

        public boolean failed = false;

        Publisher(
            final RingBuffer<TestEvent> ringBuffer,
            final int iterations,
            final CyclicBarrier barrier,
            final CountDownLatch shutdownLatch)
        {
            this.ringBuffer = ringBuffer;
            this.barrier = barrier;
            this.iterations = iterations;
            this.shutdownLatch = shutdownLatch;
        }

        @Override
        public void run()
        {
            try
            {
                barrier.await();

                int i = iterations;
                while (--i != -1)
                {
                    long next = ringBuffer.next();
                    TestEvent testEvent = ringBuffer.get(next);
                    testEvent.sequence = next;
                    testEvent.a = next + 13;
                    testEvent.b = next - 7;
                    testEvent.s = "wibble-" + next;
                    ringBuffer.publish(next);
                }
            }
            catch (Exception e)
            {
                failed = true;
            }
            finally
            {
                shutdownLatch.countDown();
            }
        }
    }

    private static class TestEvent
    {
        public long sequence;
        public long a;
        public long b;
        public String s;

        public static final EventFactory<TestEvent> FACTORY = () -> new TestEvent();
    }
}