Back to Repositories

Testing DirectLogFetcher Binlog Processing in Canal

A comprehensive test suite for DirectLogFetcher in Alibaba Canal that validates MySQL binlog parsing and event handling. The suite tests direct log fetching capabilities, binlog event processing, and connection management with MySQL servers.

Test Coverage Overview

The test suite provides extensive coverage of MySQL binlog parsing functionality:
  • Binlog event type handling and processing
  • Connection management and slave registration
  • Event parsing for different MySQL operations (INSERT/UPDATE/DELETE)
  • Charset handling and binary format processing
  • Error handling and connection cleanup

Implementation Analysis

The testing approach utilizes JUnit for structured unit testing, with a focus on real MySQL connectivity scenarios:
  • Direct connection testing with MySQL server
  • Event parsing validation for different log types
  • Implementation of slave protocol commands
  • Comprehensive event processing pipeline testing

Technical Details

Key technical components and configurations:
  • JUnit test framework
  • MySQL protocol implementation
  • Binary log event parsing
  • Connection handling with InetSocketAddress
  • Charset and encoding management
  • Custom packet management utilities

Best Practices Demonstrated

The test suite exemplifies several testing best practices:
  • Comprehensive error handling and cleanup
  • Detailed event type validation
  • Systematic connection management
  • Proper resource cleanup in finally blocks
  • Thorough validation of binary data handling

alibaba/canal

parse/src/test/java/com/alibaba/otter/canal/parse/DirectLogFetcherTest.java

            
package com.alibaba.otter.canal.parse;

import static com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher.MASTER_HEARTBEAT_PERIOD_SECONDS;

import java.io.IOException;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.util.BitSet;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang.StringUtils;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector;
import com.alibaba.otter.canal.parse.driver.mysql.MysqlQueryExecutor;
import com.alibaba.otter.canal.parse.driver.mysql.MysqlUpdateExecutor;
import com.alibaba.otter.canal.parse.driver.mysql.packets.HeaderPacket;
import com.alibaba.otter.canal.parse.driver.mysql.packets.client.BinlogDumpCommandPacket;
import com.alibaba.otter.canal.parse.driver.mysql.packets.client.RegisterSlaveCommandPacket;
import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ErrorPacket;
import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ResultSetPacket;
import com.alibaba.otter.canal.parse.driver.mysql.utils.PacketManager;
import com.alibaba.otter.canal.parse.exception.CanalParseException;
import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher;
import com.taobao.tddl.dbsync.binlog.LogContext;
import com.taobao.tddl.dbsync.binlog.LogDecoder;
import com.taobao.tddl.dbsync.binlog.LogEvent;
import com.taobao.tddl.dbsync.binlog.event.*;
import com.taobao.tddl.dbsync.binlog.event.TableMapLogEvent.ColumnInfo;
import com.taobao.tddl.dbsync.binlog.event.mariadb.AnnotateRowsEvent;
import com.taobao.tddl.dbsync.binlog.event.mariadb.BinlogCheckPointLogEvent;

@Ignore
public class DirectLogFetcherTest {

    protected final Logger logger         = LoggerFactory.getLogger(this.getClass());
    protected String       binlogFileName = "mysql-bin.000001";
    protected Charset      charset        = Charset.forName("utf-8");
    private int            binlogChecksum;

    @Test
    public void testSimple() {
        DirectLogFetcher fetcher = new DirectLogFetcher();
        try {
            MysqlConnector connector = new MysqlConnector(new InetSocketAddress("127.0.0.1", 3306), "canal", "canal");
            connector.connect();
            updateSettings(connector);
            loadBinlogChecksum(connector);
            sendRegisterSlave(connector, 3);
            sendBinlogDump(connector, "mysql-bin.000002", 4L, 3);

            fetcher.start(connector.getChannel());

            LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
            LogContext context = new LogContext();
            context.setFormatDescription(new FormatDescriptionLogEvent(4, binlogChecksum));

            while (fetcher.fetch()) {
                LogEvent event = null;
                event = decoder.decode(fetcher, context);

                if (event == null) {
                    throw new RuntimeException("parse failed");
                }
                processEvent(event, decoder, context);
            }
        } catch (Throwable e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        } finally {
            try {
                fetcher.close();
            } catch (IOException e) {
                Assert.fail(e.getMessage());
            }
        }

    }

    private void processEvent(LogEvent event, LogDecoder decoder, LogContext context) throws Throwable {
        int eventType = event.getHeader().getType();
        switch (eventType) {
            case LogEvent.ROTATE_EVENT:
                // binlogFileName = ((RotateLogEvent)
                // event).getFilename();
                System.out.println("RotateLogEvent : " + ((RotateLogEvent) event).getFilename());
                break;
            case LogEvent.BINLOG_CHECKPOINT_EVENT:
                // binlogFileName = ((BinlogCheckPointLogEvent)
                // event).getFilename();
                System.out.println("BinlogCheckPointLogEvent : " + ((BinlogCheckPointLogEvent) event).getFilename());
                break;
            case LogEvent.TABLE_MAP_EVENT:
                parseTableMapEvent((TableMapLogEvent) event);
                break;
            case LogEvent.WRITE_ROWS_EVENT_V1:
            case LogEvent.WRITE_ROWS_EVENT:
                parseRowsEvent((WriteRowsLogEvent) event);
                break;
            case LogEvent.UPDATE_ROWS_EVENT_V1:
            case LogEvent.PARTIAL_UPDATE_ROWS_EVENT:
            case LogEvent.UPDATE_ROWS_EVENT:
                parseRowsEvent((UpdateRowsLogEvent) event);
                break;
            case LogEvent.DELETE_ROWS_EVENT_V1:
            case LogEvent.DELETE_ROWS_EVENT:
                parseRowsEvent((DeleteRowsLogEvent) event);
                break;
            case LogEvent.QUERY_EVENT:
                parseQueryEvent((QueryLogEvent) event);
                break;
            case LogEvent.ROWS_QUERY_LOG_EVENT:
                parseRowsQueryEvent((RowsQueryLogEvent) event);
                break;
            case LogEvent.ANNOTATE_ROWS_EVENT:
                break;
            case LogEvent.XID_EVENT:
                break;
            case LogEvent.TRANSACTION_PAYLOAD_EVENT:
                List<LogEvent> events = decoder.processIterateDecode(event, context);
                for (LogEvent deEvent : events) {
                    processEvent(deEvent, decoder, context);
                }
                break;
            default:
                break;
        }
    }

    private void sendRegisterSlave(MysqlConnector connector, int slaveId) throws IOException {
        RegisterSlaveCommandPacket cmd = new RegisterSlaveCommandPacket();
        cmd.reportHost = connector.getAddress().getAddress().getHostAddress();
        cmd.reportPasswd = connector.getPassword();
        cmd.reportUser = connector.getUsername();
        cmd.serverId = slaveId;
        byte[] cmdBody = cmd.toBytes();

        HeaderPacket header = new HeaderPacket();
        header.setPacketBodyLength(cmdBody.length);
        header.setPacketSequenceNumber((byte) 0x00);
        PacketManager.writePkg(connector.getChannel(), header.toBytes(), cmdBody);

        header = PacketManager.readHeader(connector.getChannel(), 4);
        byte[] body = PacketManager.readBytes(connector.getChannel(), header.getPacketBodyLength());
        assert body != null;
        if (body[0] < 0) {
            if (body[0] == -1) {
                ErrorPacket err = new ErrorPacket();
                err.fromBytes(body);
                throw new IOException("Error When doing Register slave:" + err.toString());
            } else {
                throw new IOException("Unexpected packet with field_count=" + body[0]);
            }
        }
    }

    private void sendBinlogDump(MysqlConnector connector, String binlogfilename, Long binlogPosition, int slaveId)
                                                                                                                  throws IOException {
        BinlogDumpCommandPacket binlogDumpCmd = new BinlogDumpCommandPacket();
        binlogDumpCmd.binlogFileName = binlogfilename;
        binlogDumpCmd.binlogPosition = binlogPosition;
        binlogDumpCmd.slaveServerId = slaveId;
        byte[] cmdBody = binlogDumpCmd.toBytes();

        HeaderPacket binlogDumpHeader = new HeaderPacket();
        binlogDumpHeader.setPacketBodyLength(cmdBody.length);
        binlogDumpHeader.setPacketSequenceNumber((byte) 0x00);
        PacketManager.writePkg(connector.getChannel(), binlogDumpHeader.toBytes(), cmdBody);
    }

    private void updateSettings(MysqlConnector connector) throws IOException {
        try {
            update("set wait_timeout=9999999", connector);
        } catch (Exception e) {
            logger.warn("update wait_timeout failed", e);
        }
        try {
            update("set net_write_timeout=7200", connector);
        } catch (Exception e) {
            logger.warn("update net_write_timeout failed", e);
        }

        try {
            update("set net_read_timeout=7200", connector);
        } catch (Exception e) {
            logger.warn("update net_read_timeout failed", e);
        }

        try {
            // 设置服务端返回结果时不做编码转化,直接按照数据库的二进制编码进行发送,由客户端自己根据需求进行编码转化
            update("set names 'binary'", connector);
        } catch (Exception e) {
            logger.warn("update names failed", e);
        }

        try {
            // mysql5.6针对checksum支持需要设置session变量
            // 如果不设置会出现错误: Slave can not handle replication events with the
            // checksum that master is configured to log
            // 但也不能乱设置,需要和mysql server的checksum配置一致,不然RotateLogEvent会出现乱码
            update("set @master_binlog_checksum= @@global.binlog_checksum", connector);
        } catch (Exception e) {
            logger.warn("update master_binlog_checksum failed", e);
        }

        try {
            // 参考:https://github.com/alibaba/canal/issues/284
            // mysql5.6需要设置slave_uuid避免被server kill链接
            update("set @slave_uuid=uuid()", connector);
        } catch (Exception e) {
            if (!StringUtils.contains(e.getMessage(), "Unknown system variable")
                && !StringUtils.contains(e.getMessage(), "slave_uuid can't be set")) {
                logger.warn("update slave_uuid failed", e);
            }
        }

        try {
            // mariadb针对特殊的类型,需要设置session变量
            update("SET @mariadb_slave_capability='" + LogEvent.MARIA_SLAVE_CAPABILITY_MINE + "'", connector);
        } catch (Exception e) {
            logger.warn("update mariadb_slave_capability failed", e);
        }

        try {
            long period = TimeUnit.SECONDS.toNanos(MASTER_HEARTBEAT_PERIOD_SECONDS);
            update("SET @master_heartbeat_period=" + period, connector);
        } catch (Exception e) {
            logger.warn("update master_heartbeat_period failed", e);
        }
    }

    private void loadBinlogChecksum(MysqlConnector connector) {
        ResultSetPacket rs = null;
        try {
            rs = query("select @@global.binlog_checksum", connector);
        } catch (IOException e) {
            throw new CanalParseException(e);
        }

        List<String> columnValues = rs.getFieldValues();
        if (columnValues != null && columnValues.size() >= 1 && columnValues.get(0).toUpperCase().equals("CRC32")) {
            binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_CRC32;
        } else {
            binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_OFF;
        }
    }

    public ResultSetPacket query(String cmd, MysqlConnector connector) throws IOException {
        MysqlQueryExecutor exector = new MysqlQueryExecutor(connector);
        return exector.query(cmd);
    }

    public void update(String cmd, MysqlConnector connector) throws IOException {
        MysqlUpdateExecutor exector = new MysqlUpdateExecutor(connector);
        exector.update(cmd);
    }

    protected void parseQueryEvent(QueryLogEvent event) {
        System.out.println(String.format("================> binlog[%s:%s] , name[%s]",
            binlogFileName,
            event.getHeader().getLogPos() - event.getHeader().getEventLen(),
            event.getCatalog()));
        System.out.println("sql : " + event.getQuery());
    }

    protected void parseRowsQueryEvent(RowsQueryLogEvent event) throws Exception {
        System.out.println(String.format("================> binlog[%s:%s]", binlogFileName, event.getHeader()
            .getLogPos() - event.getHeader().getEventLen()));
        System.out.println("sql : " + new String(event.getRowsQuery().getBytes("ISO-8859-1"), charset.name()));
    }

    protected void parseAnnotateRowsEvent(AnnotateRowsEvent event) throws Exception {
        System.out.println(String.format("================> binlog[%s:%s]", binlogFileName, event.getHeader()
            .getLogPos() - event.getHeader().getEventLen()));
        System.out.println("sql : " + new String(event.getRowsQuery().getBytes("ISO-8859-1"), charset.name()));
    }

    public void parseTableMapEvent(TableMapLogEvent event) {
        try {
            String charsetDbName = new String(event.getDbName().getBytes("ISO-8859-1"), charset.name());
            event.setDbname(charsetDbName);

            String charsetTbName = new String(event.getTableName().getBytes("ISO-8859-1"), charset.name());
            event.setTblname(charsetTbName);
        } catch (UnsupportedEncodingException e) {
            throw new CanalParseException(e);
        }
    }

    protected void parseXidEvent(XidLogEvent event) throws Exception {
        System.out.println(String.format("================> binlog[%s:%s]", binlogFileName, event.getHeader()
            .getLogPos() - event.getHeader().getEventLen()));
        System.out.println("xid : " + event.getXid());
    }

    protected void parseRowsEvent(RowsLogEvent event) {
        try {
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s]",
                binlogFileName,
                event.getHeader().getLogPos() - event.getHeader().getEventLen(),
                event.getTable().getDbName(),
                event.getTable().getTableName()));
            RowsLogBuffer buffer = event.getRowsBuf(charset);
            BitSet columns = event.getColumns();
            BitSet changeColumns = event.getChangeColumns();
            while (buffer.nextOneRow(columns)) {
                // 处理row记录
                int type = event.getHeader().getType();
                if (LogEvent.WRITE_ROWS_EVENT_V1 == type || LogEvent.WRITE_ROWS_EVENT == type) {
                    // insert的记录放在before字段中
                    parseOneRow(event, buffer, columns, true);
                } else if (LogEvent.DELETE_ROWS_EVENT_V1 == type || LogEvent.DELETE_ROWS_EVENT == type) {
                    // delete的记录放在before字段中
                    parseOneRow(event, buffer, columns, false);
                } else {
                    // update需要处理before/after
                    System.out.println("-------> before");
                    parseOneRow(event, buffer, columns, false);
                    if (!buffer.nextOneRow(changeColumns, true)) {
                        break;
                    }
                    System.out.println("-------> after");
                    parseOneRow(event, buffer, changeColumns, true);
                }

            }
        } catch (Exception e) {
            throw new RuntimeException("parse row data failed.", e);
        }
    }

    protected void parseOneRow(RowsLogEvent event, RowsLogBuffer buffer, BitSet cols, boolean isAfter)
                                                                                                      throws UnsupportedEncodingException {
        TableMapLogEvent map = event.getTable();
        if (map == null) {
            throw new RuntimeException("not found TableMap with tid=" + event.getTableId());
        }

        final int columnCnt = map.getColumnCnt();
        final ColumnInfo[] columnInfo = map.getColumnInfo();

        for (int i = 0; i < columnCnt; i++) {
            if (!cols.get(i)) {
                continue;
            }

            ColumnInfo info = columnInfo[i];
            buffer.nextValue(null, i, info.type, info.meta);

            if (buffer.isNull()) {
                //
            } else {
                final Serializable value = buffer.getValue();
                if (value instanceof byte[]) {
                    System.out.println(new String((byte[]) value));
                } else {
                    System.out.println(value);
                }
            }
        }

    }

}