Back to Repositories

Testing Sequence Reporting Callback Mechanisms in LMAX-Disruptor

This test suite validates the sequence reporting callback functionality in the LMAX Disruptor’s event processing system. It ensures proper sequence tracking and callback behavior in the RingBuffer implementation for multi-producer scenarios.

Test Coverage Overview

The test suite focuses on verifying sequence reporting mechanisms in the Disruptor’s event processing pipeline.

Key areas covered include:
  • Sequence callback initialization and updates
  • Multi-producer RingBuffer sequence tracking
  • Event handler sequence reporting accuracy
  • Batch processing boundary conditions

Implementation Analysis

The testing approach employs JUnit Jupiter to validate sequence reporting behavior through a mock event handler implementation. It uses CountDownLatch for synchronization control and implements custom EventHandler interface to track sequence progression.

Notable patterns include:
  • Asynchronous event processing verification
  • Thread-safe sequence reporting
  • Batch processing validation

Technical Details

Testing infrastructure includes:
  • JUnit Jupiter test framework
  • RingBuffer with MultiProducer configuration
  • Custom StubEvent implementation
  • BatchEventProcessor for event handling
  • CountDownLatch for thread synchronization
  • Sequence tracking through callback mechanism

Best Practices Demonstrated

The test exemplifies high-quality testing practices for concurrent systems.

Notable practices include:
  • Proper thread management and cleanup
  • Explicit sequence validation at critical points
  • Clear separation of test setup and assertions
  • Effective use of latches for synchronization
  • Comprehensive state verification

lmax-exchange/disruptor

src/test/java/com/lmax/disruptor/SequenceReportingCallbackTest.java

            
/*
 * Copyright 2011 LMAX Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.lmax.disruptor;

import com.lmax.disruptor.support.StubEvent;
import org.junit.jupiter.api.Test;

import java.util.concurrent.CountDownLatch;

import static com.lmax.disruptor.RingBuffer.createMultiProducer;
import static org.junit.jupiter.api.Assertions.assertEquals;

public class SequenceReportingCallbackTest
{
    private final CountDownLatch callbackLatch = new CountDownLatch(1);
    private final CountDownLatch onEndOfBatchLatch = new CountDownLatch(1);

    @Test
    public void shouldReportProgressByUpdatingSequenceViaCallback()
        throws Exception
    {
        final RingBuffer<StubEvent> ringBuffer = createMultiProducer(StubEvent.EVENT_FACTORY, 16);
        final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
        final EventHandler<StubEvent> handler = new TestSequenceReportingEventHandler();
        final BatchEventProcessor<StubEvent> batchEventProcessor = new BatchEventProcessorBuilder().build(
                ringBuffer, sequenceBarrier, handler);
        ringBuffer.addGatingSequences(batchEventProcessor.getSequence());

        Thread thread = new Thread(batchEventProcessor);
        thread.setDaemon(true);
        thread.start();

        assertEquals(-1L, batchEventProcessor.getSequence().get());
        ringBuffer.publish(ringBuffer.next());

        callbackLatch.await();
        assertEquals(0L, batchEventProcessor.getSequence().get());

        onEndOfBatchLatch.countDown();
        assertEquals(0L, batchEventProcessor.getSequence().get());

        batchEventProcessor.halt();
        thread.join();
    }

    private class TestSequenceReportingEventHandler implements EventHandler<StubEvent>
    {
        private Sequence sequenceCallback;

        @Override
        public void setSequenceCallback(final Sequence sequenceTrackerCallback)
        {
            this.sequenceCallback = sequenceTrackerCallback;
        }

        @Override
        public void onEvent(final StubEvent event, final long sequence, final boolean endOfBatch) throws Exception
        {
            sequenceCallback.set(sequence);
            callbackLatch.countDown();

            if (endOfBatch)
            {
                onEndOfBatchLatch.await();
            }
        }
    }
}