Back to Repositories

Testing Canal Server Protocol Implementation in alibaba/canal

This test suite validates the core functionality of Canal Server, focusing on client-server communication, authentication flows, and message handling in Alibaba’s Canal system. It implements comprehensive testing of the networking layer and protocol-specific operations.

Test Coverage Overview

The test suite provides extensive coverage of Canal Server’s networking capabilities with focus on:
  • Client authentication and handshake protocols
  • Subscription and unsubscription flows
  • Message batch processing and acknowledgment
  • Client rollback operations
  • Error handling and timeout scenarios

Implementation Analysis

The testing approach utilizes JUnit framework with socket-level communication testing. It implements low-level ByteBuffer operations for packet handling and employs builder patterns for constructing protocol messages. The test demonstrates both happy path and error scenarios using Canal’s custom protocol implementation.

Technical Details

Key technical components include:
  • JUnit 4 testing framework
  • Java NIO SocketChannel for network operations
  • Protocol Buffers for message serialization
  • Custom ByteBuffer handling for message framing
  • Embedded Canal server configuration
  • Mock MySQL connection parameters

Best Practices Demonstrated

The test exhibits several testing best practices:
  • Proper test setup and teardown with @Before and @After annotations
  • Comprehensive error handling and validation
  • Isolation of network communication logic
  • Clear separation of test configuration and execution
  • Systematic verification of protocol states

alibaba/canal

server/src/test/java/com/alibaba/otter/canal/server/CanalServerTest.java

            
package com.alibaba.otter.canal.server;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Arrays;

import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

import com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager;
import com.alibaba.otter.canal.instance.manager.model.Canal;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter.HAMode;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter.IndexMode;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter.MetaMode;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter.SourcingType;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter.StorageMode;
import com.alibaba.otter.canal.protocol.CanalPacket.Ack;
import com.alibaba.otter.canal.protocol.CanalPacket.ClientAck;
import com.alibaba.otter.canal.protocol.CanalPacket.ClientAuth;
import com.alibaba.otter.canal.protocol.CanalPacket.ClientRollback;
import com.alibaba.otter.canal.protocol.CanalPacket.Get;
import com.alibaba.otter.canal.protocol.CanalPacket.Handshake;
import com.alibaba.otter.canal.protocol.CanalPacket.Messages;
import com.alibaba.otter.canal.protocol.CanalPacket.Packet;
import com.alibaba.otter.canal.protocol.CanalPacket.PacketType;
import com.alibaba.otter.canal.protocol.CanalPacket.Sub;
import com.alibaba.otter.canal.protocol.CanalPacket.Unsub;
import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
import com.alibaba.otter.canal.server.netty.CanalServerWithNetty;
import com.alibaba.otter.canal.server.netty.NettyUtils;

@Ignore
public class CanalServerTest {

    protected static final String cluster1      = "127.0.0.1:2188";
    protected static final String DESTINATION   = "ljhtest1";
    protected static final String DETECTING_SQL = "insert into retl.xdual values(1,now()) on duplicate key update x=now()";
    protected static final String MYSQL_ADDRESS = "127.0.0.1";
    protected static final String USERNAME      = "retl";
    protected static final String PASSWORD      = "retl";
    protected static final String FILTER        = "retl\\..*,erosa.canaltable1s,erosa.canaltable1t";

    private final ByteBuffer      header        = ByteBuffer.allocate(4);
    private CanalServerWithNetty  nettyServer;

    @Before
    public void setUp() {
        CanalServerWithEmbedded embeddedServer = new CanalServerWithEmbedded();
        embeddedServer.setCanalInstanceGenerator(destination -> {
            Canal canal = buildCanal();
            return new CanalInstanceWithManager(canal, FILTER);
        });

        nettyServer = CanalServerWithNetty.instance();
        nettyServer.setEmbeddedServer(embeddedServer);
        nettyServer.setPort(1088);
        nettyServer.start();
    }

    @Test
    public void testAuth() {

        try {
            SocketChannel channel = SocketChannel.open();
            channel.connect(new InetSocketAddress("127.0.0.1", 1088));
            Packet p = Packet.parseFrom(readNextPacket(channel));

            if (p.getVersion() != 1) {
                throw new Exception("unsupported version at this client.");
            }

            if (p.getType() != PacketType.HANDSHAKE) {
                throw new Exception("expect handshake but found other type.");
            }
            //
            Handshake handshake = Handshake.parseFrom(p.getBody());
            System.out.println(handshake.getSupportedCompressions());
            //
            ClientAuth ca = ClientAuth.newBuilder()
                .setUsername("")
                .setNetReadTimeout(10000)
                .setNetWriteTimeout(10000)
                .build();
            writeWithHeader(channel,
                Packet.newBuilder()
                    .setType(PacketType.CLIENTAUTHENTICATION)
                    .setVersion(NettyUtils.VERSION)
                    .setBody(ca.toByteString())
                    .build()
                    .toByteArray());
            //
            p = Packet.parseFrom(readNextPacket(channel));
            if (p.getType() != PacketType.ACK) {
                throw new Exception("unexpected packet type when ack is expected");
            }

            Ack ack = Ack.parseFrom(p.getBody());
            if (ack.getErrorCode() > 0) {
                throw new Exception("something goes wrong when doing authentication: " + ack.getErrorMessage());
            }

            writeWithHeader(channel, Packet.newBuilder()
                .setType(PacketType.SUBSCRIPTION)
                .setVersion(NettyUtils.VERSION)
                .setBody(Sub.newBuilder().setDestination(DESTINATION).setClientId("1").build().toByteString())
                .build()
                .toByteArray());
            //
            p = Packet.parseFrom(readNextPacket(channel));
            ack = Ack.parseFrom(p.getBody());
            if (ack.getErrorCode() > 0) {
                throw new Exception("failed to subscribe with reason: " + ack.getErrorMessage());
            }

            for (int i = 0; i < 10; i++) {
                writeWithHeader(channel,
                    Packet.newBuilder()
                        .setType(PacketType.GET)
                        .setVersion(NettyUtils.VERSION)
                        .setBody(Get.newBuilder()
                            .setDestination(DESTINATION)
                            .setClientId("1")
                            .setFetchSize(10)
                            .build()
                            .toByteString())
                        .build()
                        .toByteArray());
                p = Packet.parseFrom(readNextPacket(channel));

                long batchId = -1L;
                switch (p.getType()) {
                    case MESSAGES: {
                        Messages messages = Messages.parseFrom(p.getBody());
                        batchId = messages.getBatchId();
                        break;
                    }
                    case ACK: {
                        ack = Ack.parseFrom(p.getBody());
                        if (ack.getErrorCode() > 0) {
                            throw new Exception("failed to subscribe with reason: " + ack.getErrorMessage());
                        }
                        break;
                    }
                    default: {
                        throw new Exception("unexpected packet type: " + p.getType());
                    }
                }

                System.out.println("!!!!!!!!!!!!!!!!! " + batchId);
                Thread.sleep(1000L);
                writeWithHeader(channel,
                    Packet.newBuilder()
                        .setType(PacketType.CLIENTACK)
                        .setVersion(NettyUtils.VERSION)
                        .setBody(ClientAck.newBuilder()
                            .setDestination(DESTINATION)
                            .setClientId("1")
                            .setBatchId(batchId)
                            .build()
                            .toByteString())
                        .build()
                        .toByteArray());
            }

            writeWithHeader(channel,
                Packet.newBuilder()
                    .setType(PacketType.CLIENTROLLBACK)
                    .setVersion(NettyUtils.VERSION)
                    .setBody(ClientRollback.newBuilder()
                        .setDestination(DESTINATION)
                        .setClientId("1")
                        .build()
                        .toByteString())
                    .build()
                    .toByteArray());

            writeWithHeader(channel,
                Packet.newBuilder()
                    .setType(PacketType.UNSUBSCRIPTION)
                    .setVersion(NettyUtils.VERSION)
                    .setBody(Unsub.newBuilder().setDestination(DESTINATION).setClientId("1").build().toByteString())
                    .build()
                    .toByteArray());

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @After
    public void tearDown() {
        nettyServer.stop();
    }

    private byte[] readNextPacket(SocketChannel channel) throws IOException {
        header.clear();
        read(channel, header);
        int bodyLen = header.getInt(0);
        ByteBuffer bodyBuf = ByteBuffer.allocate(bodyLen);
        read(channel, bodyBuf);
        return bodyBuf.array();
    }

    private void writeWithHeader(SocketChannel channel, byte[] body) throws IOException {
        ByteBuffer header = ByteBuffer.allocate(4);
        header.putInt(body.length);
        header.flip();
        int len = channel.write(header);
        assert (len == header.capacity());

        channel.write(ByteBuffer.wrap(body));
    }

    private void read(SocketChannel channel, ByteBuffer buffer) throws IOException {
        while (buffer.hasRemaining()) {
            int r = channel.read(buffer);
            if (r == -1) {
                throw new IOException("end of stream when reading header");
            }
        }
    }

    private Canal buildCanal() {
        Canal canal = new Canal();
        canal.setId(1L);
        canal.setName(DESTINATION);
        canal.setDesc("test");

        CanalParameter parameter = new CanalParameter();

        parameter.setZkClusters(Arrays.asList("127.0.0.1:2188"));
        parameter.setMetaMode(MetaMode.MEMORY);
        parameter.setHaMode(HAMode.HEARTBEAT);
        parameter.setIndexMode(IndexMode.MEMORY);

        parameter.setStorageMode(StorageMode.MEMORY);
        parameter.setMemoryStorageBufferSize(32 * 1024);

        parameter.setSourcingType(SourcingType.MYSQL);
        parameter.setDbAddresses(Arrays.asList(new InetSocketAddress(MYSQL_ADDRESS, 3306),
            new InetSocketAddress(MYSQL_ADDRESS, 3306)));
        parameter.setDbUsername(USERNAME);
        parameter.setDbPassword(PASSWORD);
        parameter.setPositions(Arrays.asList("{\"journalName\":\"mysql-bin.000001\",\"position\":6163L,\"timestamp\":1322803601000L}",
            "{\"journalName\":\"mysql-bin.000001\",\"position\":6163L,\"timestamp\":1322803601000L}"));

        parameter.setSlaveId(1234L);

        parameter.setDefaultConnectionTimeoutInSeconds(30);
        parameter.setConnectionCharset("UTF-8");
        parameter.setReceiveBufferSize(8 * 1024);
        parameter.setSendBufferSize(8 * 1024);

        parameter.setDetectingEnable(false);
        parameter.setDetectingIntervalInSeconds(10);
        parameter.setDetectingRetryTimes(3);
        parameter.setDetectingSQL(DETECTING_SQL);

        canal.setCanalParameter(parameter);
        return canal;
    }
}