Back to Repositories

Testing Fatal Exception Shutdown Behavior in LMAX-Disruptor

This test suite validates the graceful shutdown behavior of the LMAX Disruptor when encountering fatal exceptions. It ensures the ring buffer implementation can properly handle and recover from critical errors while maintaining data integrity.

Test Coverage Overview

The test suite focuses on verifying the Disruptor’s shutdown mechanism when fatal exceptions occur during event processing.

Key areas covered include:
  • Graceful shutdown with active FatalExceptionHandler
  • Event publishing under error conditions
  • ByteArray event handling with deliberate failures
  • Resource cleanup during shutdown

Implementation Analysis

The testing approach utilizes JUnit 5 with timeout constraints to validate shutdown behavior. It implements a custom FailingEventHandler that throws an exception after processing 3 events, allowing verification of the exception handling chain.

Notable patterns include:
  • Setup/teardown lifecycle management
  • Controlled failure injection
  • Event translation pattern for byte array handling

Technical Details

Testing infrastructure includes:
  • JUnit Jupiter test framework
  • Disruptor configuration with SingleProducer type
  • BlockingWaitStrategy for event processing
  • DaemonThreadFactory for thread management
  • Custom ByteArrayFactory for event creation

Best Practices Demonstrated

The test exhibits several quality testing practices:

  • Proper test isolation with @BeforeEach/@AfterEach
  • Timeout constraints for performance boundaries
  • Clear separation of concerns between event handling and translation
  • Effective resource management and cleanup
  • Controlled error injection for edge case testing

lmax-exchange/disruptor

src/test/java/com/lmax/disruptor/ShutdownOnFatalExceptionTest.java

            
package com.lmax.disruptor;

import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.lmax.disruptor.util.DaemonThreadFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import java.util.Random;
import java.util.concurrent.TimeUnit;

public class ShutdownOnFatalExceptionTest
{

    private final Random random = new Random();

    private final FailingEventHandler eventHandler = new FailingEventHandler();

    private Disruptor<byte[]> disruptor;

    @SuppressWarnings("unchecked")
    @BeforeEach
    public void setUp()
    {
        disruptor = new Disruptor<>(
                new ByteArrayFactory(256), 1024, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE,
                new BlockingWaitStrategy());
        disruptor.handleEventsWith(eventHandler);
        disruptor.setDefaultExceptionHandler(new FatalExceptionHandler());
    }

    @Test
    @Timeout(value = 1000, unit = TimeUnit.MILLISECONDS)
    public void shouldShutdownGracefulEvenWithFatalExceptionHandler()
    {
        disruptor.start();

        byte[] bytes;
        for (int i = 1; i < 10; i++)
        {
            bytes = new byte[32];
            random.nextBytes(bytes);
            disruptor.publishEvent(new ByteArrayTranslator(bytes));
        }
    }

    @AfterEach
    public void tearDown()
    {
        disruptor.shutdown();
    }

    private static class ByteArrayTranslator implements EventTranslator<byte[]>
    {

        private final byte[] bytes;

        ByteArrayTranslator(final byte[] bytes)
        {
            this.bytes = bytes;
        }

        @Override
        public void translateTo(final byte[] event, final long sequence)
        {
            System.arraycopy(bytes, 0, event, 0, bytes.length);
        }
    }

    private static class FailingEventHandler implements EventHandler<byte[]>
    {
        private int count = 0;

        @Override
        public void onEvent(final byte[] event, final long sequence, final boolean endOfBatch) throws Exception
        {
            // some logging
            count++;
            if (count == 3)
            {
                throw new IllegalStateException();
            }
        }
    }

    private static class ByteArrayFactory implements EventFactory<byte[]>
    {
        private int eventSize;

        ByteArrayFactory(final int eventSize)
        {
            this.eventSize = eventSize;
        }

        @Override
        public byte[] newInstance()
        {
            return new byte[eventSize];
        }
    }
}