Back to Repositories

Testing MySQL Binlog Event Performance Processing in Canal

This performance test suite evaluates MySQL binlog event processing capabilities in the Canal framework. It measures throughput and efficiency of binlog event decoding while monitoring real-time TPS (Transactions Per Second) metrics for large-scale data processing scenarios.

Test Coverage Overview

The test suite provides comprehensive coverage of MySQL binlog event processing performance.

Key areas tested include:
  • Binlog dump command execution and streaming
  • Event decoding performance at scale
  • TPS (Transactions Per Second) measurement
  • Connection handling and settings management
Edge cases address large volume event processing and extended runtime scenarios.

Implementation Analysis

The testing approach employs a direct log fetcher pattern with real-time performance monitoring. Implementation leverages MySQL protocol-specific components including MysqlConnector and LogDecoder.

Technical patterns include:
  • Atomic counters for thread-safe event tracking
  • Batch-based performance calculation
  • Resource management via try-with-resources
  • Custom packet handling for binlog streaming

Technical Details

Testing infrastructure includes:
  • DirectLogFetcher for raw binlog access
  • LogDecoder for binary log parsing
  • MysqlConnector for database connectivity
  • Custom packet management utilities
Configuration includes:
  • Binlog checksum settings
  • MariaDB slave capability parameters
  • UTF-8 character encoding
  • Local MySQL instance (127.0.0.1:3306)

Best Practices Demonstrated

The test implementation showcases several performance testing best practices.

Notable practices include:
  • Proper resource cleanup with AutoCloseable
  • Modular method organization for setup and execution
  • Regular performance metric reporting
  • Configurable test parameters
  • Exception handling and logging

alibaba/canal

parse/src/test/java/com/alibaba/otter/canal/parse/MysqlBinlogEventPerformanceTest.java

            
package com.alibaba.otter.canal.parse;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.util.concurrent.atomic.AtomicLong;

import com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector;
import com.alibaba.otter.canal.parse.driver.mysql.MysqlUpdateExecutor;
import com.alibaba.otter.canal.parse.driver.mysql.packets.HeaderPacket;
import com.alibaba.otter.canal.parse.driver.mysql.packets.client.BinlogDumpCommandPacket;
import com.alibaba.otter.canal.parse.driver.mysql.utils.PacketManager;
import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher;
import com.taobao.tddl.dbsync.binlog.LogContext;
import com.taobao.tddl.dbsync.binlog.LogDecoder;
import com.taobao.tddl.dbsync.binlog.LogEvent;
import org.junit.Ignore;

@Ignore
public class MysqlBinlogEventPerformanceTest {

    protected static Charset charset = Charset.forName("utf-8");

    public static void main(String args[]) {
        try (DirectLogFetcher fetcher = new DirectLogFetcher()) {
            MysqlConnector connector = new MysqlConnector(new InetSocketAddress("127.0.0.1", 3306), "root", "hello");
            connector.connect();
            updateSettings(connector);
            sendBinlogDump(connector, "mysql-bin.000006", 120L, 3);
            fetcher.start(connector.getChannel());
            LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
            LogContext context = new LogContext();
            AtomicLong sum = new AtomicLong(0);
            long start = System.currentTimeMillis();
            long last = 0;
            long end = 0;
            while (fetcher.fetch()) {
                decoder.decode(fetcher, context);
                sum.incrementAndGet();
                long current = sum.get();
                if (current - last >= 100000) {
                    end = System.currentTimeMillis();
                    long tps = ((current - last) * 1000) / (end - start);
                    System.out.println(" total : " + sum + " , cost : " + (end - start) + " , tps : " + tps);
                    last = current;
                    start = end;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static void sendBinlogDump(MysqlConnector connector, String binlogfilename, Long binlogPosition, int slaveId)
                                                                                                                         throws IOException {
        BinlogDumpCommandPacket binlogDumpCmd = new BinlogDumpCommandPacket();
        binlogDumpCmd.binlogFileName = binlogfilename;
        binlogDumpCmd.binlogPosition = binlogPosition;
        binlogDumpCmd.slaveServerId = slaveId;
        byte[] cmdBody = binlogDumpCmd.toBytes();

        HeaderPacket binlogDumpHeader = new HeaderPacket();
        binlogDumpHeader.setPacketBodyLength(cmdBody.length);
        binlogDumpHeader.setPacketSequenceNumber((byte) 0x00);
        PacketManager.writePkg(connector.getChannel(), binlogDumpHeader.toBytes(), cmdBody);
    }

    private static void updateSettings(MysqlConnector connector) throws IOException {
        update("set @master_binlog_checksum= '@@global.binlog_checksum'", connector);
        update("SET @mariadb_slave_capability='" + LogEvent.MARIA_SLAVE_CAPABILITY_MINE + "'", connector);
    }

    public static void update(String cmd, MysqlConnector connector) throws IOException {
        MysqlUpdateExecutor exector = new MysqlUpdateExecutor(connector);
        exector.update(cmd);
    }

}