Back to Repositories

Testing Embedded Canal Server Implementation in alibaba/canal

This test suite implements comprehensive testing for the Canal Server with Embedded functionality in Alibaba’s Canal project. It validates message processing, subscription handling, and high-availability features in an embedded server environment.

Test Coverage Overview

The test suite provides extensive coverage of Canal’s embedded server operations:

  • Message retrieval with and without acknowledgment
  • Subscription management and client identity handling
  • Rollback functionality for message processing
  • High-availability switching mechanisms
  • Empty message handling and retry logic

Implementation Analysis

The testing approach utilizes JUnit framework with a base abstract test class pattern. It implements setup/teardown mechanisms for server initialization and cleanup, while providing concrete test methods for different server operations.

  • Abstract class design for flexible canal configuration
  • Systematic message processing validation
  • Controlled environment testing with embedded server

Technical Details

Testing infrastructure includes:

  • JUnit 4 testing framework
  • Spring utility classes for collection handling
  • Mock MySQL configuration (127.0.0.1:2188)
  • Canal client identity management
  • Embedded server instance configuration
  • Custom canal instance generator

Best Practices Demonstrated

The test suite exemplifies several testing best practices:

  • Proper test isolation through setUp/tearDown methods
  • Comprehensive error handling and assertions
  • Systematic approach to testing different message processing scenarios
  • Clear separation of concerns between test cases
  • Effective use of abstract class for common functionality

alibaba/canal

server/src/test/java/com/alibaba/otter/canal/server/embedded/BaseCanalServerWithEmbededTest.java

            
package com.alibaba.otter.canal.server.embedded;

import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.springframework.util.CollectionUtils;

import com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager;
import com.alibaba.otter.canal.instance.manager.model.Canal;
import com.alibaba.otter.canal.parse.CanalEventParser;
import com.alibaba.otter.canal.parse.CanalHASwitchable;
import com.alibaba.otter.canal.protocol.ClientIdentity;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
@Ignore
public abstract class BaseCanalServerWithEmbededTest {

    protected static final String   cluster1       = "127.0.0.1:2188";
    protected static final String   DESTINATION    = "example";
    protected static final String   DETECTING_SQL  = "insert into retl.xdual values(1,now()) on duplicate key update x=now()";
    protected static final String   MYSQL_ADDRESS  = "127.0.0.1";
    protected static final String   USERNAME       = "canal";
    protected static final String   PASSWORD       = "canal";
    protected static final String   FILTER         = ".*\\\\..*";

    private CanalServerWithEmbedded server;
    private ClientIdentity          clientIdentity = new ClientIdentity(DESTINATION, (short) 1);                               ;

    @Before
    public void setUp() {
        server = CanalServerWithEmbedded.instance();
        server.setCanalInstanceGenerator(destination -> {
            Canal canal = buildCanal();
            return new CanalInstanceWithManager(canal, FILTER);
        });
        server.start();
        server.start(DESTINATION);
    }

    @After
    public void tearDown() {
        server.stop();
    }

    @Test
    public void testGetWithoutAck() {
        int maxEmptyCount = 10;
        int emptyCount = 0;
        int totalCount = 0;
        server.subscribe(clientIdentity);
        while (emptyCount < maxEmptyCount) {
            Message message = server.getWithoutAck(clientIdentity, 11);
            if (CollectionUtils.isEmpty(message.getEntries())) {
                emptyCount++;
                try {
                    Thread.sleep(emptyCount * 300L);
                } catch (InterruptedException e) {
                    Assert.fail();
                }

                System.out.println("empty count : " + emptyCount);
            } else {
                emptyCount = 0;
                totalCount += message.getEntries().size();
                server.ack(clientIdentity, message.getId());
            }
        }

        System.out.println("!!!!!! testGetWithoutAck totalCount : " + totalCount);
        server.unsubscribe(clientIdentity);
    }

    @Test
    public void testGet() {
        int maxEmptyCount = 10;
        int emptyCount = 0;
        int totalCount = 0;
        server.subscribe(clientIdentity);
        while (emptyCount < maxEmptyCount) {
            Message message = server.get(clientIdentity, 11);
            if (CollectionUtils.isEmpty(message.getEntries())) {
                emptyCount++;
                try {
                    Thread.sleep(emptyCount * 300L);
                } catch (InterruptedException e) {
                    Assert.fail();
                }

                System.out.println("empty count : " + emptyCount);
            } else {
                emptyCount = 0;
                totalCount += message.getEntries().size();
            }
        }

        System.out.println("!!!!!! testGet totalCount : " + totalCount);
        server.unsubscribe(clientIdentity);
    }

    // @Test
    public void testRollback() {
        int maxEmptyCount = 10;
        int emptyCount = 0;
        int totalCount = 0;
        server.subscribe(clientIdentity);
        while (emptyCount < maxEmptyCount) {
            Message message = server.getWithoutAck(clientIdentity, 11);
            if (CollectionUtils.isEmpty(message.getEntries())) {
                emptyCount++;
                try {
                    Thread.sleep(emptyCount * 300L);
                } catch (InterruptedException e) {
                    Assert.fail();
                }

                System.out.println("empty count : " + emptyCount);
            } else {
                emptyCount = 0;
                totalCount += message.getEntries().size();
            }
        }
        System.out.println("!!!!!! testRollback totalCount : " + totalCount);

        server.rollback(clientIdentity);// 直接rollback掉,再取一次
        emptyCount = 0;
        totalCount = 0;
        while (emptyCount < maxEmptyCount) {
            Message message = server.getWithoutAck(clientIdentity, 11);
            if (CollectionUtils.isEmpty(message.getEntries())) {
                emptyCount++;
                try {
                    Thread.sleep(emptyCount * 300L);
                } catch (InterruptedException e) {
                    Assert.fail();
                }

                System.out.println("empty count : " + emptyCount);
            } else {
                emptyCount = 0;
                totalCount += message.getEntries().size();
            }
        }

        System.out.println("!!!!!! testRollback after rollback ,  totalCount : " + totalCount);
        server.unsubscribe(clientIdentity);
    }

    // @Test
    public void testSwitch() {
        int maxEmptyCount = 10;
        int emptyCount = 0;
        int totalCount = 0;

        int thresold = 50;
        int batchSize = 11;
        server.subscribe(clientIdentity);
        while (emptyCount < maxEmptyCount) {
            Message message = server.get(clientIdentity, batchSize);
            if (CollectionUtils.isEmpty(message.getEntries())) {
                emptyCount++;
                try {
                    Thread.sleep(emptyCount * 300L);
                } catch (InterruptedException e) {
                    Assert.fail();
                }

                System.out.println("empty count : " + emptyCount);
            } else {
                emptyCount = 0;
                totalCount += message.getEntries().size();

                if ((totalCount + 1) % 100 >= thresold && (totalCount + 1) % 100 <= thresold + batchSize) {
                    CanalEventParser eventParser = server.getCanalInstances().get(DESTINATION).getEventParser();
                    if (eventParser instanceof CanalHASwitchable) {
                        ((CanalHASwitchable) eventParser).doSwitch();// 执行切换
                        try {
                            Thread.sleep(5 * 1000); // 等待parser启动
                        } catch (InterruptedException e) {
                            Assert.fail();
                        }
                    }
                }
            }
        }

        System.out.println("!!!!!! testGet totalCount : " + totalCount);
        server.unsubscribe(clientIdentity);
    }

    abstract protected Canal buildCanal();
}