Back to Repositories

Testing Disruptor Event Processing Workflow in LMAX-Exchange/disruptor

This test suite validates the core functionality of the LMAX Disruptor framework, focusing on event handling, processor management, and concurrency control. It ensures reliable event processing and proper dependency management between event handlers.

Test Coverage Overview

The test suite provides comprehensive coverage of the Disruptor’s key features including:
  • Event handler registration and lifecycle management
  • Event processing and sequencing
  • Exception handling and error scenarios
  • Multiple event processor configurations
  • Producer-consumer interactions
  • Custom processor integration

Implementation Analysis

The testing approach utilizes JUnit 5 framework with specialized test fixtures and stubs. Tests verify event processing order, handler dependencies, and concurrent operations using CountDownLatch and atomic references for synchronization. The implementation includes both synchronous and asynchronous event processing scenarios.

Technical Details

Testing tools and configuration:
  • JUnit Jupiter test framework
  • Custom stub implementations for threading and event handling
  • RingBuffer configuration with 4-slot size
  • BlockingWaitStrategy for event processing
  • Timeout controls for long-running tests

Best Practices Demonstrated

The test suite demonstrates several testing best practices:
  • Proper test isolation using setUp/tearDown methods
  • Comprehensive exception handling verification
  • Thorough boundary condition testing
  • Clear separation of concerns between test cases
  • Effective use of test utilities and helper methods

lmax-exchange/disruptor

src/test/java/com/lmax/disruptor/dsl/DisruptorTest.java

            
/*
 * Copyright 2011 LMAX Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.lmax.disruptor.dsl;

import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.BatchEventProcessorBuilder;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.FatalExceptionHandler;
import com.lmax.disruptor.RewindableEventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.SimpleBatchRewindStrategy;
import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.dsl.stubs.DelayedEventHandler;
import com.lmax.disruptor.dsl.stubs.EventHandlerStub;
import com.lmax.disruptor.dsl.stubs.EvilEqualsEventHandler;
import com.lmax.disruptor.dsl.stubs.ExceptionThrowingEventHandler;
import com.lmax.disruptor.dsl.stubs.SleepingEventHandler;
import com.lmax.disruptor.dsl.stubs.StubExceptionHandler;
import com.lmax.disruptor.dsl.stubs.StubPublisher;
import com.lmax.disruptor.dsl.stubs.StubThreadFactory;
import com.lmax.disruptor.support.DummyEventHandler;
import com.lmax.disruptor.support.TestEvent;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

public class DisruptorTest
{
    private static final int TIMEOUT_IN_SECONDS = 2;

    public final StubThreadFactory executor = new StubThreadFactory();

    private final Collection<DelayedEventHandler> delayedEventHandlers = new ArrayList<>();
    private Disruptor<TestEvent> disruptor;
    private RingBuffer<TestEvent> ringBuffer;

    @BeforeEach
    public void setUp()
    {
        createDisruptor();
    }

    @AfterEach
    public void tearDown()
    {
        for (DelayedEventHandler delayedEventHandler : delayedEventHandlers)
        {
            delayedEventHandler.stopWaiting();
        }

        disruptor.halt();
        executor.joinAllThreads();
    }

    @Test
    public void shouldHaveStartedAfterStartCalled()
    {
        assertFalse(disruptor.hasStarted(), "Should only be set to started after start is called");

        disruptor.start();

        assertTrue(disruptor.hasStarted(), "Should be set to started after start is called");
    }

    @Test
    public void shouldProcessMessagesPublishedBeforeStartIsCalled() throws Exception
    {
        final CountDownLatch eventCounter = new CountDownLatch(2);
        disruptor.handleEventsWith((event, sequence, endOfBatch) -> eventCounter.countDown());

        disruptor.publishEvent((event, sequence) ->
        {
        });

        disruptor.start();

        disruptor.publishEvent((event, sequence) ->
        {
        });

        if (!eventCounter.await(5, TimeUnit.SECONDS))
        {
            fail("Did not process event published before start was called. Missed events: " + eventCounter.getCount());
        }
    }

    @Test
    public void shouldBatchOfEvents() throws Exception
    {
        final CountDownLatch eventCounter = new CountDownLatch(2);
        disruptor.handleEventsWith((event, sequence, endOfBatch) -> eventCounter.countDown());

        disruptor.start();

        disruptor.publishEvents((event, sequence, arg) ->
        {
        }, new Object[] { "a", "b" });

        if (!eventCounter.await(5, TimeUnit.SECONDS))
        {
            fail("Did not process event published before start was called. Missed events: " + eventCounter.getCount());
        }
    }

    @Test
    public void shouldHandleEventsWithRewindableEventHandlers() throws Exception
    {
        final CountDownLatch eventCounter = new CountDownLatch(2);
        final RewindableEventHandler<TestEvent> testEventRewindableEventHandler = (event, sequence, endOfBatch) -> eventCounter.countDown();
        disruptor.handleEventsWith(new SimpleBatchRewindStrategy(), testEventRewindableEventHandler);

        disruptor.start();

        disruptor.publishEvents((event, sequence, arg) ->
        {
        }, new Object[] { "a", "b" });

        if (!eventCounter.await(5, TimeUnit.SECONDS))
        {
            fail("Did not process event published before start was called. Missed events: " + eventCounter.getCount());
        }
    }

    @Test
    public void shouldAddEventProcessorsAfterPublishing()
    {
        RingBuffer<TestEvent> rb = disruptor.getRingBuffer();
        BatchEventProcessor<TestEvent> b1 = new BatchEventProcessorBuilder().build(
                rb, rb.newBarrier(), new SleepingEventHandler());
        BatchEventProcessor<TestEvent> b2 = new BatchEventProcessorBuilder().build(
                rb, rb.newBarrier(b1.getSequence()), new SleepingEventHandler());
        BatchEventProcessor<TestEvent> b3 = new BatchEventProcessorBuilder().build(
                rb, rb.newBarrier(b2.getSequence()), new SleepingEventHandler());

        assertThat(b1.getSequence().get(), is(-1L));
        assertThat(b2.getSequence().get(), is(-1L));
        assertThat(b3.getSequence().get(), is(-1L));

        rb.publish(rb.next());
        rb.publish(rb.next());
        rb.publish(rb.next());
        rb.publish(rb.next());
        rb.publish(rb.next());
        rb.publish(rb.next());

        disruptor.handleEventsWith(b1, b2, b3);

        assertThat(b1.getSequence().get(), is(5L));
        assertThat(b2.getSequence().get(), is(5L));
        assertThat(b3.getSequence().get(), is(5L));
    }

    @Test
    public void shouldSetSequenceForHandlerIfAddedAfterPublish()
    {
        RingBuffer<TestEvent> rb = disruptor.getRingBuffer();
        EventHandler<TestEvent> b1 = new SleepingEventHandler();
        EventHandler<TestEvent> b2 = new SleepingEventHandler();
        EventHandler<TestEvent> b3 = new SleepingEventHandler();

        rb.publish(rb.next());
        rb.publish(rb.next());
        rb.publish(rb.next());
        rb.publish(rb.next());
        rb.publish(rb.next());
        rb.publish(rb.next());

        disruptor.handleEventsWith(b1, b2, b3);

        assertThat(disruptor.getSequenceValueFor(b1), is(5L));
        assertThat(disruptor.getSequenceValueFor(b2), is(5L));
        assertThat(disruptor.getSequenceValueFor(b3), is(5L));
    }

    @Test
    public void shouldGetSequenceBarrierForHandler()
    {
        RingBuffer<TestEvent> rb = disruptor.getRingBuffer();
        EventHandler<TestEvent> handler = new DummyEventHandler<>();

        disruptor.handleEventsWith(handler);
        disruptor.start();

        rb.publish(rb.next());
        rb.publish(rb.next());
        rb.publish(rb.next());
        rb.publish(rb.next());
        rb.publish(rb.next());
        rb.publish(rb.next());

        assertThat(disruptor.getBarrierFor(handler).getCursor(), is(5L));
    }

    @Test
    public void shouldGetSequenceBarrierForHandlerIfAddedAfterPublish()
    {
        RingBuffer<TestEvent> rb = disruptor.getRingBuffer();
        EventHandler<TestEvent> handler = new DummyEventHandler<>();

        rb.publish(rb.next());
        rb.publish(rb.next());
        rb.publish(rb.next());
        rb.publish(rb.next());
        rb.publish(rb.next());
        rb.publish(rb.next());

        disruptor.handleEventsWith(handler);
        disruptor.start();

        assertThat(disruptor.getBarrierFor(handler).getCursor(), is(5L));
    }

    @Test
    public void shouldCreateEventProcessorGroupForFirstEventProcessors()
    {
        executor.ignoreExecutions();
        final EventHandler<TestEvent> eventHandler1 = new SleepingEventHandler();
        EventHandler<TestEvent> eventHandler2 = new SleepingEventHandler();

        final EventHandlerGroup<TestEvent> eventHandlerGroup =
            disruptor.handleEventsWith(eventHandler1, eventHandler2);
        disruptor.start();

        assertNotNull(eventHandlerGroup);
        assertThat(executor.getExecutionCount(), equalTo(2));
    }

    @Test
    public void shouldMakeEntriesAvailableToFirstHandlersImmediately() throws Exception
    {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        EventHandler<TestEvent> eventHandler = new EventHandlerStub<>(countDownLatch);

        disruptor.handleEventsWith(createDelayedEventHandler(), eventHandler);

        ensureTwoEventsProcessedAccordingToDependencies(countDownLatch);
    }

    @Test
    public void shouldWaitUntilAllFirstEventProcessorsProcessEventBeforeMakingItAvailableToDependentEventProcessors()
        throws Exception
    {
        DelayedEventHandler eventHandler1 = createDelayedEventHandler();

        CountDownLatch countDownLatch = new CountDownLatch(2);
        EventHandler<TestEvent> eventHandler2 = new EventHandlerStub<>(countDownLatch);

        disruptor.handleEventsWith(eventHandler1).then(eventHandler2);

        ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, eventHandler1);
    }

    @Test
    public void shouldSupportAddingCustomEventProcessorWithFactory()
    {
        RingBuffer<TestEvent> rb = disruptor.getRingBuffer();
        BatchEventProcessor<TestEvent> b1 = new BatchEventProcessorBuilder().build(
                rb, rb.newBarrier(), new SleepingEventHandler());
        EventProcessorFactory<TestEvent> b2 = (ringBuffer, barrierSequences) -> new BatchEventProcessorBuilder().build(
                ringBuffer, ringBuffer.newBarrier(barrierSequences), new SleepingEventHandler());

        disruptor.handleEventsWith(b1).then(b2);

        disruptor.start();

        assertThat(executor.getExecutionCount(), equalTo(2));
    }

    @Test
    public void shouldAllowSpecifyingSpecificEventProcessorsToWaitFor()
        throws Exception
    {
        DelayedEventHandler handler1 = createDelayedEventHandler();
        DelayedEventHandler handler2 = createDelayedEventHandler();

        CountDownLatch countDownLatch = new CountDownLatch(2);
        EventHandler<TestEvent> handlerWithBarrier = new EventHandlerStub<>(countDownLatch);

        disruptor.handleEventsWith(handler1, handler2);
        disruptor.after(handler1, handler2).handleEventsWith(handlerWithBarrier);

        ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, handler1, handler2);

        assertThat(executor.getExecutionCount(), equalTo(3));
    }

    @Test
    public void shouldWaitOnAllProducersJoinedByAnd()
        throws Exception
    {
        DelayedEventHandler handler1 = createDelayedEventHandler();
        DelayedEventHandler handler2 = createDelayedEventHandler();

        CountDownLatch countDownLatch = new CountDownLatch(2);
        EventHandler<TestEvent> handlerWithBarrier = new EventHandlerStub<>(countDownLatch);

        disruptor.handleEventsWith(handler1);
        final EventHandlerGroup<TestEvent> handler2Group = disruptor.handleEventsWith(handler2);
        disruptor.after(handler1).and(handler2Group).handleEventsWith(handlerWithBarrier);

        ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, handler1, handler2);

        assertThat(executor.getExecutionCount(), equalTo(3));
    }

    @Test
    public void shouldThrowExceptionIfHandlerIsNotAlreadyConsuming()
    {
        assertThrows(IllegalArgumentException.class, () ->
                disruptor.after(createDelayedEventHandler()).handleEventsWith(createDelayedEventHandler()));
    }

    @Test
    public void shouldTrackEventHandlersByIdentityNotEquality()
    {
        assertThrows(IllegalArgumentException.class, () ->
        {
            EvilEqualsEventHandler handler1 = new EvilEqualsEventHandler();
            EvilEqualsEventHandler handler2 = new EvilEqualsEventHandler();

            disruptor.handleEventsWith(handler1);

            // handler2.equals(handler1) but it hasn't yet been registered so should throw exception.
            disruptor.after(handler2);
        });
    }

    @SuppressWarnings("deprecation")
    @Test
    public void shouldSupportSpecifyingAExceptionHandlerForEventProcessors()
        throws Exception
    {
        AtomicReference<Throwable> eventHandled = new AtomicReference<>();
        ExceptionHandler<Object> exceptionHandler = new StubExceptionHandler(eventHandled);
        RuntimeException testException = new RuntimeException();
        ExceptionThrowingEventHandler handler = new ExceptionThrowingEventHandler(testException);

        disruptor.handleExceptionsWith(exceptionHandler);
        disruptor.handleEventsWith(handler);

        publishEvent();

        final Throwable actualException = waitFor(eventHandled);
        assertSame(testException, actualException);
    }

    @SuppressWarnings("deprecation")
    @Test
    public void shouldOnlyApplyExceptionsHandlersSpecifiedViaHandleExceptionsWithOnNewEventProcessors()
        throws Exception
    {
        AtomicReference<Throwable> eventHandled = new AtomicReference<>();
        ExceptionHandler<Object> exceptionHandler = new StubExceptionHandler(eventHandled);
        RuntimeException testException = new RuntimeException();
        ExceptionThrowingEventHandler handler = new ExceptionThrowingEventHandler(testException);

        disruptor.handleExceptionsWith(exceptionHandler);
        disruptor.handleEventsWith(handler);
        disruptor.handleExceptionsWith(new FatalExceptionHandler());

        publishEvent();

        final Throwable actualException = waitFor(eventHandled);
        assertSame(testException, actualException);
    }

    @Test
    public void shouldSupportSpecifyingADefaultExceptionHandlerForEventProcessors()
        throws Exception
    {
        AtomicReference<Throwable> eventHandled = new AtomicReference<>();
        ExceptionHandler<Object> exceptionHandler = new StubExceptionHandler(eventHandled);
        RuntimeException testException = new RuntimeException();
        ExceptionThrowingEventHandler handler = new ExceptionThrowingEventHandler(testException);

        disruptor.setDefaultExceptionHandler(exceptionHandler);
        disruptor.handleEventsWith(handler);

        publishEvent();

        final Throwable actualException = waitFor(eventHandled);
        assertSame(testException, actualException);
    }

    @Test
    public void shouldApplyDefaultExceptionHandlerToExistingEventProcessors()
        throws Exception
    {
        AtomicReference<Throwable> eventHandled = new AtomicReference<>();
        ExceptionHandler<Object> exceptionHandler = new StubExceptionHandler(eventHandled);
        RuntimeException testException = new RuntimeException();
        ExceptionThrowingEventHandler handler = new ExceptionThrowingEventHandler(testException);

        disruptor.handleEventsWith(handler);
        disruptor.setDefaultExceptionHandler(exceptionHandler);

        publishEvent();

        final Throwable actualException = waitFor(eventHandled);
        assertSame(testException, actualException);
    }

    @Test
    public void shouldBlockProducerUntilAllEventProcessorsHaveAdvanced()
        throws Exception
    {
        final DelayedEventHandler delayedEventHandler = createDelayedEventHandler();
        disruptor.handleEventsWith(delayedEventHandler);

        final RingBuffer<TestEvent> ringBuffer = disruptor.start();
        delayedEventHandler.awaitStart();

        final StubPublisher stubPublisher = new StubPublisher(ringBuffer);
        try
        {
            executor.newThread(stubPublisher).start();

            assertProducerReaches(stubPublisher, 4, true);

            delayedEventHandler.processEvent();
            delayedEventHandler.processEvent();
            delayedEventHandler.processEvent();
            delayedEventHandler.processEvent();
            delayedEventHandler.processEvent();

            assertProducerReaches(stubPublisher, 5, false);
        }
        finally
        {
            stubPublisher.halt();
        }
    }

    @Test
    public void shouldBeAbleToOverrideTheExceptionHandlerForAEventProcessor()
        throws Exception
    {
        final RuntimeException testException = new RuntimeException();
        final ExceptionThrowingEventHandler eventHandler = new ExceptionThrowingEventHandler(testException);
        disruptor.handleEventsWith(eventHandler);

        AtomicReference<Throwable> reference = new AtomicReference<>();
        StubExceptionHandler exceptionHandler = new StubExceptionHandler(reference);
        disruptor.handleExceptionsFor(eventHandler).with(exceptionHandler);

        publishEvent();

        final Throwable actualException = waitFor(reference);
        assertSame(testException, actualException);
    }

    @Test
    public void shouldThrowExceptionWhenAddingEventProcessorsAfterTheProducerBarrierHasBeenCreated()
    {
        assertThrows(IllegalStateException.class, () ->
        {
            executor.ignoreExecutions();
            disruptor.handleEventsWith(new SleepingEventHandler());
            disruptor.start();
            disruptor.handleEventsWith(new SleepingEventHandler());
        });
    }

    @Test
    public void shouldThrowExceptionIfStartIsCalledTwice()
    {
        assertThrows(IllegalStateException.class, () ->
        {
            executor.ignoreExecutions();
            disruptor.handleEventsWith(new SleepingEventHandler());
            disruptor.start();
            disruptor.start();
        });
    }

    @Test
    public void shouldSupportCustomProcessorsAsDependencies()
        throws Exception
    {
        RingBuffer<TestEvent> ringBuffer = disruptor.getRingBuffer();

        final DelayedEventHandler delayedEventHandler = createDelayedEventHandler();

        CountDownLatch countDownLatch = new CountDownLatch(2);
        EventHandler<TestEvent> handlerWithBarrier = new EventHandlerStub<>(countDownLatch);

        final BatchEventProcessor<TestEvent> processor =
                new BatchEventProcessorBuilder().build(ringBuffer, ringBuffer.newBarrier(), delayedEventHandler);

        disruptor.handleEventsWith(processor).then(handlerWithBarrier);

        ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler);

        assertThat(executor.getExecutionCount(), equalTo(2));
    }

    @Test
    public void shouldSupportHandlersAsDependenciesToCustomProcessors()
        throws Exception
    {
        final DelayedEventHandler delayedEventHandler = createDelayedEventHandler();
        disruptor.handleEventsWith(delayedEventHandler);


        RingBuffer<TestEvent> ringBuffer = disruptor.getRingBuffer();
        CountDownLatch countDownLatch = new CountDownLatch(2);
        EventHandler<TestEvent> handlerWithBarrier = new EventHandlerStub<>(countDownLatch);

        final SequenceBarrier sequenceBarrier = disruptor.after(delayedEventHandler).asSequenceBarrier();
        final BatchEventProcessor<TestEvent> processor =
                new BatchEventProcessorBuilder().build(ringBuffer, sequenceBarrier, handlerWithBarrier);
        disruptor.handleEventsWith(processor);

        ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler);

        assertThat(executor.getExecutionCount(), equalTo(2));
    }

    @Test
    public void shouldSupportCustomProcessorsAndHandlersAsDependencies() throws Exception
    {
        final DelayedEventHandler delayedEventHandler1 = createDelayedEventHandler();
        final DelayedEventHandler delayedEventHandler2 = createDelayedEventHandler();
        disruptor.handleEventsWith(delayedEventHandler1);


        RingBuffer<TestEvent> ringBuffer = disruptor.getRingBuffer();
        CountDownLatch countDownLatch = new CountDownLatch(2);
        EventHandler<TestEvent> handlerWithBarrier = new EventHandlerStub<>(countDownLatch);

        final SequenceBarrier sequenceBarrier = disruptor.after(delayedEventHandler1).asSequenceBarrier();
        final BatchEventProcessor<TestEvent> processor =
                new BatchEventProcessorBuilder().build(ringBuffer, sequenceBarrier, delayedEventHandler2);

        disruptor.after(delayedEventHandler1).and(processor).handleEventsWith(handlerWithBarrier);

        ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler1, delayedEventHandler2);

        assertThat(executor.getExecutionCount(), equalTo(3));
    }

    @Test
    public void shouldSupportMultipleCustomProcessorsAsDependencies() throws Exception
    {
        final RingBuffer<TestEvent> ringBuffer = disruptor.getRingBuffer();
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        final EventHandler<TestEvent> handlerWithBarrier = new EventHandlerStub<>(countDownLatch);

        final DelayedEventHandler delayedEventHandler1 = createDelayedEventHandler();
        final BatchEventProcessor<TestEvent> processor1 =
                new BatchEventProcessorBuilder().build(ringBuffer, ringBuffer.newBarrier(), delayedEventHandler1);

        final DelayedEventHandler delayedEventHandler2 = createDelayedEventHandler();
        final BatchEventProcessor<TestEvent> processor2 =
                new BatchEventProcessorBuilder().build(ringBuffer, ringBuffer.newBarrier(), delayedEventHandler2);

        disruptor.handleEventsWith(processor1, processor2);
        disruptor.after(processor1, processor2).handleEventsWith(handlerWithBarrier);

        ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler1, delayedEventHandler2);
        assertThat(executor.getExecutionCount(), equalTo(3));
    }

    @Test
    @Timeout(value = 2000, unit = MILLISECONDS)
    public void shouldThrowTimeoutExceptionIfShutdownDoesNotCompleteNormally()
    {
        assertThrows(TimeoutException.class, () ->
        {
            //Given
            final DelayedEventHandler delayedEventHandler = createDelayedEventHandler();
            disruptor.handleEventsWith(delayedEventHandler);
            publishEvent();
            //Then
            disruptor.shutdown(1, SECONDS);
        });
    }

    @Test
    @Timeout(value = 1, unit = SECONDS)
    public void shouldTrackRemainingCapacity() throws Exception
    {
        final long[] remainingCapacity = {-1};
        //Given
        final EventHandler<TestEvent> eventHandler = (event, sequence, endOfBatch) ->
                remainingCapacity[0] = disruptor.getRingBuffer().remainingCapacity();

        disruptor.handleEventsWith(eventHandler);

        //When
        publishEvent();

        //Then
        while (remainingCapacity[0] == -1)
        {
            LockSupport.parkNanos(MILLISECONDS.toNanos(100));
        }
        assertThat(remainingCapacity[0], is(ringBuffer.getBufferSize() - 1L));
        assertThat(disruptor.getRingBuffer().remainingCapacity(), is((long) ringBuffer.getBufferSize()));
    }

    @Test
    public void shouldAllowEventHandlerWithSuperType() throws Exception
    {
        final CountDownLatch latch = new CountDownLatch(2);
        final EventHandler<Object> objectHandler = new EventHandlerStub<>(latch);

        disruptor.handleEventsWith(objectHandler);

        ensureTwoEventsProcessedAccordingToDependencies(latch);
    }

    @Test
    public void shouldAllowChainingEventHandlersWithSuperType() throws Exception
    {
        final CountDownLatch latch = new CountDownLatch(2);
        final DelayedEventHandler delayedEventHandler = createDelayedEventHandler();
        final EventHandler<Object> objectHandler = new EventHandlerStub<>(latch);

        disruptor.handleEventsWith(delayedEventHandler).then(objectHandler);

        ensureTwoEventsProcessedAccordingToDependencies(latch, delayedEventHandler);
    }

    @Test
    public void shouldMakeEntriesAvailableToFirstCustomProcessorsImmediately() throws Exception
    {
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        final EventHandler<TestEvent> eventHandler = new EventHandlerStub<>(countDownLatch);

        disruptor.handleEventsWith(
                (EventProcessorFactory<TestEvent>) (ringBuffer, barrierSequences) ->
                {
                    assertEquals(0, barrierSequences.length,
                            "Should not have had any barrier sequences");
                    return new BatchEventProcessorBuilder().build(
                            disruptor.getRingBuffer(), ringBuffer.newBarrier(
                            barrierSequences), eventHandler);
                });

        ensureTwoEventsProcessedAccordingToDependencies(countDownLatch);
    }

    @Test
    public void shouldHonourDependenciesForCustomProcessors() throws Exception
    {
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        final EventHandler<TestEvent> eventHandler = new EventHandlerStub<>(countDownLatch);
        final DelayedEventHandler delayedEventHandler = createDelayedEventHandler();

        final EventProcessorFactory<TestEvent> eventProcessorFactory = (ringBuffer, barrierSequences) ->
        {
            assertSame(1, barrierSequences.length, "Should have had a barrier sequence");
            return new BatchEventProcessorBuilder().build(
                    disruptor.getRingBuffer(), ringBuffer.newBarrier(
                            barrierSequences), eventHandler);
        };
        disruptor.handleEventsWith(delayedEventHandler)
                .then(eventProcessorFactory);

        ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler);
    }

    private void ensureTwoEventsProcessedAccordingToDependencies(
        final CountDownLatch countDownLatch,
        final DelayedEventHandler... dependencies)
        throws InterruptedException, BrokenBarrierException
    {
        publishEvent();
        publishEvent();

        for (DelayedEventHandler dependency : dependencies)
        {
            assertThatCountDownLatchEquals(countDownLatch, 2L);
            dependency.processEvent();
            dependency.processEvent();
        }

        assertThatCountDownLatchIsZero(countDownLatch);
    }

    private void assertProducerReaches(
        final StubPublisher stubPublisher,
        final int expectedPublicationCount,
        final boolean strict)
    {
        long loopStart = System.currentTimeMillis();
        while (stubPublisher.getPublicationCount() < expectedPublicationCount && System
            .currentTimeMillis() - loopStart < 5000)
        {
            Thread.yield();
        }

        if (strict)
        {
            assertThat(stubPublisher.getPublicationCount(), equalTo(expectedPublicationCount));
        }
        else
        {
            final int actualPublicationCount = stubPublisher.getPublicationCount();
            final String msg = "Producer reached unexpected count. Expected at least " + expectedPublicationCount +
                    " but only reached " + actualPublicationCount;
            assertTrue(actualPublicationCount >= expectedPublicationCount, msg);
        }
    }

    private void createDisruptor()
    {
        disruptor = new Disruptor<>(
                TestEvent.EVENT_FACTORY,
                4,
                executor,
                ProducerType.SINGLE,
                new BlockingWaitStrategy());
    }

    private void publishEvent() throws InterruptedException, BrokenBarrierException
    {
        if (ringBuffer == null)
        {
            ringBuffer = disruptor.start();

            for (DelayedEventHandler eventHandler : delayedEventHandlers)
            {
                eventHandler.awaitStart();
            }
        }

        disruptor.publishEvent((event, sequence) ->
        {
        });
    }

    private Throwable waitFor(final AtomicReference<Throwable> reference)
    {
        while (reference.get() == null)
        {
            Thread.yield();
        }

        return reference.get();
    }

    private DelayedEventHandler createDelayedEventHandler()
    {
        final DelayedEventHandler delayedEventHandler = new DelayedEventHandler();
        delayedEventHandlers.add(delayedEventHandler);
        return delayedEventHandler;
    }

    @SuppressWarnings("SameParameterValue")
    private void assertThatCountDownLatchEquals(
        final CountDownLatch countDownLatch,
        final long expectedCountDownValue)
    {
        assertThat(countDownLatch.getCount(), equalTo(expectedCountDownValue));
    }

    private void assertThatCountDownLatchIsZero(final CountDownLatch countDownLatch)
        throws InterruptedException
    {
        boolean released = countDownLatch.await(TIMEOUT_IN_SECONDS, SECONDS);
        assertTrue(released, "Batch handler did not receive entries: " + countDownLatch.getCount());
    }
}