Back to Repositories

Testing Batch Event Processing with Rewind Capabilities in LMAX-Disruptor

This test suite validates the functionality of the RewindBatchEventProcessor in the LMAX Disruptor library, focusing on batch event processing and rewind capabilities. The tests ensure proper handling of event batches, rewind operations, and error scenarios in the high-performance event processing framework.

Test Coverage Overview

The test suite comprehensively covers batch event processing scenarios with rewind functionality.

Key areas tested include:
  • Single and multiple event batch processing
  • Rewind operations at different batch positions
  • Multiple rewind scenarios
  • Error handling and recovery mechanisms
  • Batch size limitations and boundaries

Implementation Analysis

The testing approach utilizes JUnit Jupiter framework with custom event handlers and matchers. The implementation focuses on validating the RingBuffer’s batch processing capabilities through systematic test cases that verify event sequencing, rewind behaviors, and exception handling patterns.

Key patterns include:
  • TestEventHandler implementation for event processing validation
  • Custom matcher creation for sequence verification
  • Rewind strategy implementations for different scenarios

Technical Details

Testing infrastructure includes:
  • JUnit Jupiter test framework
  • Custom EventHandler implementations
  • RingBuffer configuration with 2048 buffer size
  • BatchEventProcessor with rewind capabilities
  • Custom matcher implementations for sequence validation
  • Multiple rewind strategies including NanosecondPause and EventuallyGiveUp

Best Practices Demonstrated

The test suite exemplifies high-quality testing practices through thorough setup and validation mechanisms.

Notable practices include:
  • Comprehensive setup and teardown procedures
  • Systematic test case organization
  • Robust error handling validation
  • Clear test naming conventions
  • Extensive edge case coverage

lmax-exchange/disruptor

src/test/java/com/lmax/disruptor/RewindBatchEventProcessorTest.java

            
package com.lmax.disruptor;

import com.lmax.disruptor.dsl.stubs.StubExceptionHandler;
import com.lmax.disruptor.support.LongEvent;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeMatcher;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;

import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class RewindBatchEventProcessorTest
{
    private static final int BUFFER_SIZE = 2048;
    private RingBuffer<LongEvent> ringBuffer;
    private final List<EventResult> values = new ArrayList<>();

    @BeforeEach
    public void setUp()
    {
        ringBuffer = RingBuffer.createMultiProducer(LongEvent.FACTORY, BUFFER_SIZE);
    }

    @Test
    public void shouldRewindOnFirstEventOfBatchSizeOfOne()
    {
        fill(ringBuffer, 1);

        final TestEventHandler eventHandler = new TestEventHandler(values, List.of(rewind(0, 1)), 0, -1);
        final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler);
        eventHandler.setRewindable(eventProcessor);

        eventProcessor.run();

        assertThat(values, containsExactSequence(event(0, 0)));
    }


    @Test
    public void shouldRewindOnFirstEventOfBatch()
    {
        int ringBufferEntries = 10;
        int lastSequenceNumber = ringBufferEntries - 1;
        fill(ringBuffer, ringBufferEntries);

        final TestEventHandler eventHandler = new TestEventHandler(
                values,
                singletonList(rewind(0, 1)),
                lastSequenceNumber,
                -1);
        final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler);
        eventHandler.setRewindable(eventProcessor);

        eventProcessor.run();

        assertThat(values, containsExactSequence(
                event(0, lastSequenceNumber)));
    }

    @Test
    public void shouldRewindOnEventInMiddleOfBatch()
    {
        int ringBufferEntries = 10;
        int lastSequenceNumber = ringBufferEntries - 1;
        fill(ringBuffer, ringBufferEntries);

        final TestEventHandler eventHandler = new TestEventHandler(
                values,
                singletonList(rewind(8, 1)),
                lastSequenceNumber,
                -1);
        final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler);
        eventHandler.setRewindable(eventProcessor);

        eventProcessor.run();

        assertThat(values, containsExactSequence(
                event(0, 7),
                event(0, lastSequenceNumber)));
    }

    @Test
    public void shouldRewindOnLastEventOfBatch()
    {
        int ringBufferEntries = 10;
        int lastSequenceNumber = ringBufferEntries - 1;
        fill(ringBuffer, ringBufferEntries);

        final TestEventHandler eventHandler = new TestEventHandler(
                values,
                singletonList(rewind(lastSequenceNumber, 1)),
                lastSequenceNumber,
                -1
        );

        final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler);
        eventHandler.setRewindable(eventProcessor);

        eventProcessor.run();

        assertThat(values, containsExactSequence(
                event(0, 8),
                event(0, lastSequenceNumber)));
    }

    @Test
    public void shouldRunBatchCompleteOnLastEventOfBatch()
    {
        int ringBufferEntries = 10;
        int lastSequenceNumber = ringBufferEntries - 1;
        fill(ringBuffer, ringBufferEntries);

        final TestEventHandler eventHandler = new TestEventHandler(values,
                singletonList(rewind(4, 1)),
                lastSequenceNumber,
                -1);
        final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler);
        eventHandler.setRewindable(eventProcessor);

        eventProcessor.run();

        assertThat(values, containsExactSequence(
                event(0, 3),
                event(0, lastSequenceNumber)));
    }

    @Test
    public void shouldRunBatchCompleteOnLastEventOfBatchOfOne()
    {
        fill(ringBuffer, 1);

        final TestEventHandler eventHandler = new TestEventHandler(values, singletonList(rewind(0, 1)), 0, -1);
        final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler);
        eventHandler.setRewindable(eventProcessor);

        eventProcessor.run();

        assertThat(values, containsExactSequence(
                event(0, 0)));
    }

    @Test
    public void shouldRewindMultipleTimes()
    {
        int ringBufferEntries = 10;
        int lastSequenceNumber = ringBufferEntries - 1;
        fill(ringBuffer, ringBufferEntries);

        final TestEventHandler eventHandler = new TestEventHandler(values,
                singletonList(rewind(8, 3)),
                lastSequenceNumber,
                -1);
        final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler);
        eventHandler.setRewindable(eventProcessor);

        eventProcessor.run();

        assertThat(values, containsExactSequence(
                event(0, 7),
                event(0, 7),
                event(0, 7),
                event(0, lastSequenceNumber)));
    }

    @Test
    public void shouldRewindMultipleTimesOnLastEventInBatch()
    {
        int ringBufferEntries = 10;
        int lastSequenceNumber = ringBufferEntries - 1;
        fill(ringBuffer, ringBufferEntries);

        final TestEventHandler eventHandler = new TestEventHandler(values,
                singletonList(rewind(lastSequenceNumber, 3)),
                lastSequenceNumber,
                -1);
        final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler);
        eventHandler.setRewindable(eventProcessor);

        eventProcessor.run();

        assertThat(values, containsExactSequence(
                event(0, 8),
                event(0, 8),
                event(0, 8),
                event(0, lastSequenceNumber)));
    }

    @Test
    public void shouldRewindMultipleTimesInSameBatch()
    {
        int ringBufferEntries = 10;
        int lastSequenceNumber = ringBufferEntries - 1;
        fill(ringBuffer, ringBufferEntries);

        final TestEventHandler eventHandler = new TestEventHandler(values,
                asList(rewind(5, 3), rewind(7, 3)),
                lastSequenceNumber,
                -1);
        final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler);
        eventHandler.setRewindable(eventProcessor);

        eventProcessor.run();

        assertThat(values, containsExactSequence(
                event(0, 4),
                event(0, 4),
                event(0, 4),
                event(0, 6),
                event(0, 6),
                event(0, 6),
                event(0, lastSequenceNumber)));
    }

    @Test
    public void shouldRewindMultipleTimesOnBatchOfOne()
    {
        fill(ringBuffer, 1);

        final TestEventHandler eventHandler = new TestEventHandler(values, singletonList(rewind(0, 3)), 0, -1);
        final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler);
        eventHandler.setRewindable(eventProcessor);

        eventProcessor.run();

        // nothing is actually written as first event is rewound
        assertThat(values, containsExactSequence(
                event(0, 0)));
    }

    @Test
    public void shouldFallOverWhenNonRewindableExceptionIsThrown()
    {
        int ringBufferEntries = 10;
        int lastSequenceNumber = ringBufferEntries - 1;
        fill(ringBuffer, ringBufferEntries);

        final TestEventHandler eventHandler = new TestEventHandler(values, emptyList(), lastSequenceNumber, 8);
        final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler);
        eventHandler.setRewindable(eventProcessor);

        AtomicReference<Throwable> exceptionHandled = new AtomicReference<>();
        eventProcessor.setExceptionHandler(new StubExceptionHandler(exceptionHandled));
        eventProcessor.run();
        assertEquals("not rewindable", exceptionHandled.get().getMessage());
    }

    @Test
    public void shouldProcessUpToMaxBatchSizeForEachGivenBatch()
    {
        int ringBufferEntries = 30;
        int lastSequenceNumber = ringBufferEntries - 1;
        fill(ringBuffer, ringBufferEntries);

        final TestEventHandler eventHandler = new TestEventHandler(values, emptyList(), lastSequenceNumber, -1);
        final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler);
        eventHandler.setRewindable(eventProcessor);

        eventProcessor.run();

        assertThat(values, containsExactSequence(
                event(0, 9),
                event(10, 19),
                event(20, lastSequenceNumber)));
    }

    @Test
    public void shouldOnlyRewindBatch()
    {
        int ringBufferEntries = 30;
        int lastSequenceNumber = ringBufferEntries - 1;
        fill(ringBuffer, ringBufferEntries);

        final TestEventHandler eventHandler = new TestEventHandler(values,
                singletonList(rewind(15, 3)),
                lastSequenceNumber,
                -1);
        final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler);
        eventHandler.setRewindable(eventProcessor);

        eventProcessor.run();

        assertThat(values, containsExactSequence(
                event(0, 14),
                event(0, 14),
                event(0, 14),
                event(0, lastSequenceNumber)));
    }

    @Test
    void shouldInvokeRewindPauseStrategyOnRewind()
    {
        int ringBufferEntries = 30;
        int lastSequenceNumber = ringBufferEntries - 1;
        fill(ringBuffer, ringBufferEntries);

        final TestEventHandler eventHandler = new TestEventHandler(values,
                singletonList(rewind(15, 3)),
                lastSequenceNumber,
                -1);
        CountingBatchRewindStrategy rewindPauseStrategy = new CountingBatchRewindStrategy();
        final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler, rewindPauseStrategy);

        eventHandler.setRewindable(eventProcessor);

        eventProcessor.run();

        assertThat(values, containsExactSequence(
                event(0, 14),
                event(0, 14),
                event(0, 14),
                event(0, lastSequenceNumber)));

        assertEquals(3, rewindPauseStrategy.count);
    }

    @Test
    void shouldNotInvokeRewindPauseStrategyWhenNoRewindsOccur()
    {
        int ringBufferEntries = 30;
        int lastSequenceNumber = ringBufferEntries - 1;
        fill(ringBuffer, ringBufferEntries);

        final TestEventHandler eventHandler = new TestEventHandler(values,
                singletonList(rewind(-1, -1)),
                lastSequenceNumber,
                -1);
        CountingBatchRewindStrategy rewindPauseStrategy = new CountingBatchRewindStrategy();
        final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler, rewindPauseStrategy);

        eventHandler.setRewindable(eventProcessor);

        eventProcessor.run();

        assertThat(values, containsExactSequence(
                event(0, lastSequenceNumber)));

        assertEquals(0, rewindPauseStrategy.count);
    }

    @Test
    void shouldCopeWithTheNanosecondRewindPauseStrategy()
    {
        int ringBufferEntries = 30;
        int lastSequenceNumber = ringBufferEntries - 1;
        fill(ringBuffer, ringBufferEntries);

        final TestEventHandler eventHandler = new TestEventHandler(values,
                singletonList(rewind(15, 3)),
                lastSequenceNumber,
                -1);
        final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler, new NanosecondPauseBatchRewindStrategy(1000));

        eventHandler.setRewindable(eventProcessor);

        eventProcessor.run();

        assertThat(values, containsExactSequence(
                event(0, 14),
                event(0, 14),
                event(0, 14),
                event(0, lastSequenceNumber)));

    }


    @Test
    void shouldGiveUpWhenUsingTheGiveUpRewindStrategy()
    {
        int ringBufferEntries = 30;
        int lastSequenceNumber = ringBufferEntries - 1;
        fill(ringBuffer, ringBufferEntries);

        final TestEventHandler eventHandler = new TestEventHandler(values,
                asList(rewind(15, 99), rewind(25, 99)),
                lastSequenceNumber,
                -1);
        EventuallyGiveUpBatchRewindStrategy batchRewindStrategy = new EventuallyGiveUpBatchRewindStrategy(3);
        final BatchEventProcessor<LongEvent> eventProcessor = create(eventHandler, batchRewindStrategy);

        eventHandler.setRewindable(eventProcessor);

        AtomicReference<Throwable> exceptionHandled = new AtomicReference<>();
        eventProcessor.setExceptionHandler(new StubExceptionHandler(exceptionHandled));

        eventProcessor.run();

        assertThat(values, containsExactSequence(
                event(0, 14),
                event(0, 14),
                event(0, 14),
                event(16, 24), // unable to process 15 so it ends up skipping it
                event(16, 24),
                event(16, 24),
                event(26, lastSequenceNumber))); // unable to process 25 so it ends up skipping it
    }

    @Test
    void shouldNotAllowNullBatchRewindStrategy()
    {
        final TestEventHandler eventHandler = new TestEventHandler(values,
                asList(rewind(15, 99), rewind(25, 99)),
                -1,
                -1);
        final BatchEventProcessorBuilder batchEventProcessorBuilder = new BatchEventProcessorBuilder();
        assertThrows(NullPointerException.class, () -> batchEventProcessorBuilder.build(ringBuffer, ringBuffer.newBarrier(), eventHandler, null));
    }

    private static ForceRewindSequence rewind(final long sequenceNumberToFailOn, final long timesToFail)
    {
        return new ForceRewindSequence(sequenceNumberToFailOn, timesToFail);
    }

    private EventRangeExpectation event(final long sequenceStart, final long sequenceEnd)
    {
        return new EventRangeExpectation(sequenceStart, sequenceEnd, false);
    }

    private BatchEventProcessor<LongEvent> create(final TestEventHandler eventHandler)
    {
        return create(eventHandler, new SimpleBatchRewindStrategy());
    }

    private BatchEventProcessor<LongEvent> create(final TestEventHandler eventHandler, final BatchRewindStrategy batchRewindStrategy)
    {
        return new BatchEventProcessorBuilder().build(
                ringBuffer,
                ringBuffer.newBarrier(),
                eventHandler,
                batchRewindStrategy);
    }

    private static final class TestEventHandler implements RewindableEventHandler<LongEvent>
    {
        private final List<EventResult> values;
        private BatchEventProcessor<LongEvent> processor;
        private final List<ForceRewindSequence> forceRewindSequences;
        private final long exitValue;
        private final int nonRewindableErrorSequence;

        private TestEventHandler(
                final List<EventResult> values,
                final List<ForceRewindSequence> forceRewindSequences,
                final long exitValue,
                final int nonRewindableErrorSequence)
        {
            this.values = values;
            this.forceRewindSequences = forceRewindSequences;
            this.exitValue = exitValue;
            this.nonRewindableErrorSequence = nonRewindableErrorSequence;
        }


        public void setRewindable(final BatchEventProcessor<LongEvent> processor)
        {
            this.processor = processor;
        }

        @Override
        public void onEvent(final LongEvent event, final long sequence, final boolean endOfBatch) throws RewindableException
        {

            if (sequence == nonRewindableErrorSequence)
            {
                throw new RuntimeException("not rewindable");
            }

            Optional<ForceRewindSequence> maybeForceRewindSequence = this.forceRewindSequences.stream()
                    .filter(r -> r.sequenceNumberToFailOn == sequence).findFirst();

            if (maybeForceRewindSequence.isPresent())
            {
                ForceRewindSequence forceRewindSequence = maybeForceRewindSequence.get();

                if (forceRewindSequence.numberOfTimesRewound != forceRewindSequence.timesToFail)
                {
                    forceRewindSequence.numberOfTimesRewound++;
                    throw new RewindableException(new RuntimeException());

                }
            }

            values.add(new EventResult(event.get(), false));

            if (sequence == exitValue)
            {
                processor.halt();
            }
        }
    }

    private static void fill(final RingBuffer<LongEvent> ringBuffer, final int batchSize)
    {
        for (long l = 0; l < batchSize; l++)
        {
            final long next = ringBuffer.next();
            ringBuffer.get(next).set(l);
            ringBuffer.publish(next);
        }
    }

    private static Matcher<List<EventResult>> containsExactSequence(final EventRangeExpectation... ranges)
    {
        return new TypeSafeMatcher<>()
        {
            @Override
            public void describeTo(final Description description)
            {
                description.appendValue(Arrays.toString(ranges));
            }

            @Override
            public boolean matchesSafely(final List<EventResult> item)
            {
                int index = 0;
                for (final EventRangeExpectation range : ranges)
                {
                    for (long v = range.sequenceStart, end = range.sequenceEnd; v <= end; v++)
                    {
                        final EventResult eventResult = item.get(index++);
                        if (eventResult.sequence != v && eventResult.batchFinish != range.batchFinish)
                        {
                            return false;
                        }
                    }
                }

                return item.size() == index;
            }
        };
    }

    private static final class EventRangeExpectation
    {
        private final long sequenceStart;
        private final long sequenceEnd;
        private final boolean batchFinish;

        EventRangeExpectation(final long sequenceStart, final long sequenceEnd, final boolean batchFinish)
        {
            this.sequenceStart = sequenceStart;
            this.sequenceEnd = sequenceEnd;
            this.batchFinish = batchFinish;
        }

        @Override
        public String toString()
        {
            return "{" +
                    sequenceStart +
                    "," + sequenceEnd +
                    "," + batchFinish +
                    '}';
        }
    }


    private static final class EventResult
    {
        final long sequence;
        final boolean batchFinish;

        private EventResult(final long sequence, final boolean batchFinish)
        {
            this.sequence = sequence;
            this.batchFinish = batchFinish;
        }

        @Override
        public String toString()
        {
            return "{" + sequence +
                    "," + batchFinish +
                    '}';
        }
    }

    private static final class CountingBatchRewindStrategy implements BatchRewindStrategy
    {
        int count = 0;

        @Override
        public RewindAction handleRewindException(final RewindableException e, final int retriesAttempted)
        {
            count++;
            return RewindAction.REWIND;
        }

    }

    private static final class ForceRewindSequence
    {
        final long sequenceNumberToFailOn;
        final long timesToFail;
        long numberOfTimesRewound = 0;

        private ForceRewindSequence(final long sequenceNumberToFailOn, final long timesToFail)
        {
            this.sequenceNumberToFailOn = sequenceNumberToFailOn;
            this.timesToFail = timesToFail;
        }
    }
}