Back to Repositories

Testing BooleanMutex Concurrent Operations in Canal

This test suite validates the BooleanMutex implementation in Canal’s common utilities, focusing on concurrent access patterns and thread synchronization behaviors. The tests verify mutex state management and timeout handling in multi-threaded scenarios.

Test Coverage Overview

The test suite provides comprehensive coverage of BooleanMutex functionality, including:

  • Concurrent access patterns with multiple threads
  • Mutex state transitions and synchronization
  • Timeout handling and exception scenarios
  • Thread coordination using Phaser mechanisms

Implementation Analysis

The testing approach employs JUnit framework with sophisticated concurrent testing patterns. It utilizes ExecutorService for managing thread pools and Phaser for coordinating thread execution phases. The implementation demonstrates advanced usage of atomic operations and thread synchronization primitives.

Key patterns include controlled concurrent access testing and timeout verification using explicit thread coordination.

Technical Details

Testing infrastructure includes:

  • JUnit 4 testing framework
  • java.util.concurrent utilities (ExecutorService, Phaser)
  • Atomic operations via AtomicLong
  • Fixed thread pool with configurable concurrency
  • Timeout constraints for test execution

Best Practices Demonstrated

The test suite exemplifies several testing best practices:

  • Proper resource cleanup with @Before and @After annotations
  • Explicit timeout specifications for concurrent operations
  • Controlled concurrent access testing
  • Clear separation of setup, execution, and verification phases
  • Robust exception handling and verification

alibaba/canal

common/src/test/java/com/alibaba/otter/canal/common/utils/BooleanMutexTest.java

            
package com.alibaba.otter.canal.common.utils;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

import static org.junit.Assert.*;

public class BooleanMutexTest {

    public static final int CONCURRENCY = 10;
    private ExecutorService executorService;

    @Before
    public void setUp() {
        executorService = Executors.newFixedThreadPool(CONCURRENCY);
    }

    @After
    public void tearDown() {
        if (executorService != null) {
            executorService.shutdownNow();
        }
    }

    @Test(timeout = 3000L)
    public void testBooleanMutexGet() throws Exception {

        BooleanMutex mutex = new BooleanMutex();

        AtomicLong atomicLong = new AtomicLong(0);

        Phaser phaser = new Phaser(CONCURRENCY + 1);

        for (int i = 0; i < CONCURRENCY; i++) {
            executorService.submit(() -> {
                try {
                    // arrive phase1 and wait until phase1 finish
                    int phase1 = phaser.arrive();
                    phaser.awaitAdvanceInterruptibly(phase1);

                    mutex.get();
                    atomicLong.addAndGet(1);

                    // arrive phase2
                    phaser.arrive();
                } catch (InterruptedException e) {
                    throw new IllegalStateException(e);
                }
            });
        }

        assertEquals(0, atomicLong.get());

        // arrive phase1 and wait until phase1 finish
        int phase1 = phaser.arrive();
        phaser.awaitAdvanceInterruptibly(phase1);
        assertEquals(0, atomicLong.get());

        mutex.set(true);

        // arrive phase2 and wait until phase2 finish
        int phase2 = phaser.arrive();
        phaser.awaitAdvanceInterruptibly(phase2);
        assertEquals(CONCURRENCY, atomicLong.get());
    }


    @Test(timeout = 30000L, expected = TimeoutException.class)
    public void testBooleanMutexBlock() throws Exception {

        BooleanMutex mutex = new BooleanMutex();

        AtomicLong atomicLong = new AtomicLong(0);

        Phaser phaser = new Phaser(CONCURRENCY + 1);

        for (int i = 0; i < CONCURRENCY; i++) {
            executorService.submit(() -> {
                try {
                    // arrive phase1 and wait until phase1 finish
                    int phase1 = phaser.arrive();
                    phaser.awaitAdvanceInterruptibly(phase1);

                    mutex.get();
                    atomicLong.addAndGet(1);

                    // arrive phase2
                    phaser.arrive();
                } catch (InterruptedException e) {
                    throw new IllegalStateException(e);
                }
            });
        }

        assertEquals(0, atomicLong.get());

        // arrive phase1 and wait until phase1 finish
        int phase1 = phaser.arrive();
        phaser.awaitAdvanceInterruptibly(phase1);
        assertEquals(0, atomicLong.get());

        // mutex is still false
        mutex.set(false);


        // arrive phase2 and wait until phase2 finish
        int phase2 = phaser.arrive();
        // will throw interrupted exception when timeout because mutex is still false
        phaser.awaitAdvanceInterruptibly(phase2, 2, TimeUnit.SECONDS);

        fail("unreachable code");
    }
}