Back to Repositories

Testing Client Running Monitor Coordination in Alibaba Canal

This test suite validates the client running monitoring functionality in Alibaba Canal, focusing on client lifecycle management and multi-server coordination using ZooKeeper.

Test Coverage Overview

The test suite provides comprehensive coverage of client running states and monitoring capabilities:

  • Single server lifecycle testing with start/stop operations
  • Multi-server coordination testing with concurrent client instances
  • Client ID management and address tracking
  • ZooKeeper path handling and persistence verification

Implementation Analysis

The testing approach utilizes JUnit for structured test execution, implementing both single-instance and concurrent testing scenarios. The suite employs CountDownLatch for synchronization and ExecutorService for managing parallel client operations.

Key patterns include:
  • Builder pattern for ClientRunningMonitor setup
  • Listener pattern for client state changes
  • Thread pool execution for concurrent testing

Technical Details

Testing infrastructure includes:

  • ZkClientx for ZooKeeper integration
  • JUnit 4 test framework
  • ClientRunningMonitor for state management
  • CountDownLatch for synchronization control
  • ExecutorService for concurrent execution
  • Random delays for realistic testing scenarios

Best Practices Demonstrated

The test suite exemplifies several testing best practices:

  • Proper test setup and teardown with @Before and @After annotations
  • Isolation of test scenarios
  • Comprehensive concurrent testing
  • Clear separation of concerns between test cases
  • Effective use of assertion mechanisms
  • Robust cleanup of test resources

alibaba/canal

client/src/test/java/com/alibaba/otter/canal/client/running/ClientRunningTest.java

            
package com.alibaba.otter.canal.client.running;

import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.commons.lang.math.RandomUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

import com.alibaba.otter.canal.client.impl.running.ClientRunningData;
import com.alibaba.otter.canal.client.impl.running.ClientRunningListener;
import com.alibaba.otter.canal.client.impl.running.ClientRunningMonitor;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
import com.alibaba.otter.canal.common.zookeeper.ZookeeperPathUtils;
@Ignore
public class ClientRunningTest extends AbstractZkTest {

    private ZkClientx zkclientx = new ZkClientx(cluster1 + ";" + cluster2);
    private short     clientId  = 1001;

    @Before
    public void setUp() {
        String path = ZookeeperPathUtils.getDestinationPath(destination);
        zkclientx.deleteRecursive(path);

        zkclientx.createPersistent(ZookeeperPathUtils.getClientIdNodePath(this.destination, clientId), true);
    }

    @After
    public void tearDown() {
        String path = ZookeeperPathUtils.getDestinationPath(destination);
        zkclientx.deleteRecursive(path);
    }

    @Test
    public void testOneServer() {
        final CountDownLatch countLatch = new CountDownLatch(2);
        ClientRunningMonitor runningMonitor = buildClientRunning(countLatch, clientId, 2088);
        runningMonitor.start();
        sleep(2000L);
        runningMonitor.stop();
        sleep(2000L);

        if (countLatch.getCount() != 0) {
            Assert.fail();
        }
    }

    @Test
    public void testMultiServer() {
        final CountDownLatch countLatch = new CountDownLatch(30);
        final ClientRunningMonitor runningMonitor1 = buildClientRunning(countLatch, clientId, 2088);
        final ClientRunningMonitor runningMonitor2 = buildClientRunning(countLatch, clientId, 2089);
        final ClientRunningMonitor runningMonitor3 = buildClientRunning(countLatch, clientId, 2090);
        final ExecutorService executor = Executors.newFixedThreadPool(3);
        executor.submit(() -> {
            for (int i = 0; i < 10; i++) {
                if (!runningMonitor1.isStart()) {
                    runningMonitor1.start();
                }
                sleep(2000L + RandomUtils.nextInt(500));
                runningMonitor1.stop();
                sleep(2000L + RandomUtils.nextInt(500));
            }
        });

        executor.submit(() -> {
            for (int i = 0; i < 10; i++) {
                if (!runningMonitor2.isStart()) {
                    runningMonitor2.start();
                }
                sleep(2000L + RandomUtils.nextInt(500));
                runningMonitor2.stop();
                sleep(2000L + RandomUtils.nextInt(500));
            }
        });

        executor.submit(() -> {
            for (int i = 0; i < 10; i++) {
                if (!runningMonitor3.isStart()) {
                    runningMonitor3.start();
                }
                sleep(2000L + RandomUtils.nextInt(500));
                runningMonitor3.stop();
                sleep(2000L + RandomUtils.nextInt(500));
            }
        });

        sleep(30000L);
    }

    private ClientRunningMonitor buildClientRunning(final CountDownLatch countLatch, final short clientId,
                                                    final int port) {
        ClientRunningData clientData = new ClientRunningData();
        clientData.setClientId(clientId);
        clientData.setAddress(AddressUtils.getHostIp());

        ClientRunningMonitor runningMonitor = new ClientRunningMonitor();
        runningMonitor.setDestination(destination);
        runningMonitor.setZkClient(zkclientx);
        runningMonitor.setClientData(clientData);
        runningMonitor.setListener(new ClientRunningListener() {

            public InetSocketAddress processActiveEnter() {
                System.out.println(String.format("clientId:%s port:%s has start", clientId, port));
                countLatch.countDown();
                return new InetSocketAddress(AddressUtils.getHostIp(), port);
            }

            public void processActiveExit() {
                countLatch.countDown();
                System.out.println(String.format("clientId:%s port:%s has stop", clientId, port));
            }

        });
        runningMonitor.setDelayTime(1);
        return runningMonitor;
    }
}