Back to Repositories

Testing ClickHouse Batch Synchronization Thread Safety in Canal

A comprehensive test suite for validating thread safety in ClickHouse batch synchronization operations within the Canal adapter. This test suite focuses on concurrent data operations and synchronization reliability in a multi-threaded environment.

Test Coverage Overview

The test suite provides thorough coverage of concurrent batch synchronization operations in ClickHouse.

Key areas tested include:
  • Multiple concurrent insert and update operations
  • Thread safety across different database tables
  • Atomic counter operations for unique ID generation
  • Data synchronization with varying payload sizes

Implementation Analysis

Implements a robust multi-threaded testing approach using Java’s ExecutorService framework with a fixed thread pool of 5 threads.

Notable patterns include:
  • Atomic counter usage for thread-safe ID generation
  • Random operation selection between INSERT and UPDATE
  • Concurrent execution of 10 tasks with 300 operations each
  • Synchronized data structure handling

Technical Details

Testing infrastructure includes:
  • JUnit test framework with @Before and @Test annotations
  • ExecutorService for thread pool management
  • AtomicInteger for thread-safe counting
  • Custom ClickHouseAdapter implementation
  • Logback for logging configuration
  • Random data generation utilities

Best Practices Demonstrated

The test suite exemplifies several testing best practices for concurrent systems:

  • Proper thread pool management and cleanup
  • Controlled concurrent access to shared resources
  • Comprehensive error handling and future result verification
  • Appropriate test timing and synchronization
  • Clear separation of setup and test execution

alibaba/canal

client-adapter/clickhouse/src/test/java/com/alibaba/otter/canal/client/adapter/clickhouse/ClickHouseBatchSyncThreadSafeTest.java

            
package com.alibaba.otter.canal.client.adapter.clickhouse;

import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

import com.alibaba.otter.canal.client.adapter.clickhouse.sync.Common;
import com.alibaba.otter.canal.client.adapter.support.Dml;

import ch.qos.logback.classic.Level;

/**
 * @author: Xander
 * @date: Created in 2023/11/13 22:27
 * @email: [email protected]
 * @description: Testing thread safe
 */

@Ignore
public class ClickHouseBatchSyncThreadSafeTest {

    private ClickHouseAdapter clickHouseAdapter;

    private ExecutorService   executorService;

    private String[]          operations = new String[] { "INSERT", "UPDATE" };

    private String[]          tables     = new String[] { "user", "customer" };

    @Before
    public void init() {
        clickHouseAdapter = Common.init();
        Common.setLogLevel(Level.INFO);
        executorService = Executors.newFixedThreadPool(5);
    }

    @Test
    public void test01() throws InterruptedException, ExecutionException {
        ArrayList<Future> list = new ArrayList();
        AtomicInteger count = new AtomicInteger();
        for (int i = 0; i < 10; i++) {
            list.add(executorService.submit(() -> {
                for (int j = 0; j < 300; j++) {
                    Random random = new Random();
                    int cou = count.incrementAndGet();
                    // test insert
                    String dmlType = operations[random.nextInt(1)];
                    Dml dml = new Dml();
                    dml.setDestination("example");
                    dml.setTs(new Date().getTime());
                    dml.setType(dmlType);
                    dml.setDatabase("mytest");
                    dml.setTable(tables[(int) Math.round(Math.random())]);
                    List<Map<String, Object>> dataList = new ArrayList<>();
                    Map<String, Object> data = new LinkedHashMap<>();
                    dataList.add(data);
                    data.put("id", cou);
                    data.put("name", "Eric" + cou);
                    data.put("role_id", cou);
                    data.put("c_time", new Date());
                    data.put("test1", "sdfasdfawe中国asfwef");
                    dml.setData(dataList);
                    clickHouseAdapter.sync(Collections.singletonList(dml));
                }
            }));

        }
        for (Future future : list) {
            future.get();
        }
        Thread.sleep(10000L); // waiting multiple threads execute successfully.
    }

}