Back to Repositories

Validating Protocol Buffer Message Handling in Canal

This test suite validates the Canal protocol implementation, focusing on message packet construction, parsing, and error handling in the Alibaba Canal data synchronization system. It ensures reliable communication between Canal components through protocol buffer serialization and deserialization.

Test Coverage Overview

The test suite provides comprehensive coverage of the Canal protocol’s core functionality.

Key areas tested include:
  • Message packet construction and serialization
  • Protocol buffer encoding/decoding
  • Error handling for unsupported compression types
  • Binary message format validation
  • Header and entry building verification

Implementation Analysis

The testing approach employs JUnit for unit testing Protocol Buffer message handling. The implementation uses builder patterns for constructing protocol messages, with explicit verification of packet types and compression modes. The tests leverage Google Protocol Buffers for message serialization and proper byte-level handling.

Technical patterns include:
  • Builder pattern for Header and Entry construction
  • Byte-level message encoding
  • Exception handling validation
  • Protocol buffer message parsing

Technical Details

Testing infrastructure includes:
  • JUnit 4 test framework
  • Google Protocol Buffers for message serialization
  • Custom byte array handling utilities
  • CodedOutputStream for binary message construction
  • Message size calculation and validation

Best Practices Demonstrated

The test suite exemplifies several testing best practices for protocol implementation verification. It includes proper exception handling verification, explicit size calculations, and thorough message construction validation.

Notable practices:
  • Expected exception testing
  • Comprehensive message construction verification
  • Explicit compression type validation
  • Proper resource handling
  • Clear test case organization

alibaba/canal

protocol/src/test/java/com/alibaba/otter/canal/protocol/ProtocolTest.java

            
package com.alibaba.otter.canal.protocol;

import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.Header;
import com.alibaba.otter.canal.protocol.CanalPacket.Compression;
import com.alibaba.otter.canal.protocol.CanalPacket.Messages;
import com.alibaba.otter.canal.protocol.CanalPacket.Packet;
import com.alibaba.otter.canal.protocol.CanalPacket.PacketType;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import com.google.protobuf.ByteString;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.WireFormat;
import org.junit.Test;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;

public class ProtocolTest {

    @Test(expected = CanalClientException.class)
    public void testSimple() throws IOException {
        Header.Builder headerBuilder = Header.newBuilder();
        headerBuilder.setLogfileName("mysql-bin.000001");
        headerBuilder.setLogfileOffset(1024);
        headerBuilder.setExecuteTime(1024);
        Entry.Builder entryBuilder = Entry.newBuilder();
        entryBuilder.setHeader(headerBuilder.build());
        entryBuilder.setEntryType(EntryType.ROWDATA);
        Entry entry = entryBuilder.build();
        Message message = new Message(3, true, Arrays.asList(entry.toByteString()));

        byte[] body = buildData(message);
        Packet packet = Packet.parseFrom(body);
        switch (packet.getType()) {
            case MESSAGES: {
                if (!packet.getCompression().equals(Compression.NONE)) {
                    throw new CanalClientException("compression is not supported in this connector");
                }

                Messages messages = Messages.parseFrom(packet.getBody());
                Message result = new Message(messages.getBatchId());
                for (ByteString byteString : messages.getMessagesList()) {
                    result.addEntry(Entry.parseFrom(byteString));
                }

                System.out.println(result);
                break;
            }
            default: {
                throw new CanalClientException("unexpected packet type: " + packet.getType());
            }
        }
    }

    @SuppressWarnings("deprecation")
    private byte[] buildData(Message message) throws IOException {
        List<ByteString> rowEntries = message.getRawEntries();
        // message size
        int messageSize = 0;
        messageSize += com.google.protobuf.CodedOutputStream.computeInt64Size(1, message.getId());

        int dataSize = 0;
        for (ByteString rowEntry : rowEntries) {
            dataSize += CodedOutputStream.computeBytesSizeNoTag(rowEntry);
        }
        messageSize += dataSize;
        messageSize += 1 * rowEntries.size();
        // packet size
        int size = 0;
        size += com.google.protobuf.CodedOutputStream.computeEnumSize(3, PacketType.MESSAGES.getNumber());
        size += com.google.protobuf.CodedOutputStream.computeTagSize(5)
                + com.google.protobuf.CodedOutputStream.computeRawVarint32Size(messageSize) + messageSize;
        // TODO recyle bytes[]
        byte[] body = new byte[size];
        CodedOutputStream output = CodedOutputStream.newInstance(body);
        output.writeEnum(3, PacketType.MESSAGES.getNumber());

        output.writeTag(5, WireFormat.WIRETYPE_LENGTH_DELIMITED);
        output.writeRawVarint32(messageSize);
        // message
        output.writeInt64(1, message.getId());
        for (ByteString rowEntry : rowEntries) {
            output.writeBytes(2, rowEntry);
        }
        output.checkNoSpaceLeft();

        return body;
    }
}