Back to Repositories

Testing MySQL Binlog Event Processing Implementation in Canal

DirectLogFetcherTest is a comprehensive unit test suite for validating MySQL binlog event processing in the Alibaba Canal project. It focuses on testing the direct log fetching mechanism and various binlog event handling capabilities.

Test Coverage Overview

The test suite provides extensive coverage of MySQL binlog event processing functionality. Key areas tested include:

  • Binlog connection and fetching mechanisms
  • Multiple event type parsing including WRITE, UPDATE, DELETE operations
  • Transaction payload processing
  • MariaDB-specific event handling
  • Error handling and resource cleanup

Implementation Analysis

The testing approach utilizes JUnit framework with a systematic event processing workflow. The implementation employs a switch-case pattern for handling different event types, with dedicated parsing methods for each event category. The test leverages JDBC connectivity for MySQL interaction and implements proper resource management.

Technical Details

Testing tools and configuration include:

  • JUnit test framework
  • MySQL JDBC driver integration
  • Custom LogDecoder and LogContext implementations
  • MariaDB slave capability configuration
  • Binlog checksum verification

Best Practices Demonstrated

The test demonstrates several testing best practices:

  • Proper exception handling and resource cleanup
  • Modular event processing architecture
  • Comprehensive event type coverage
  • Clear separation of setup and processing logic
  • Effective use of inheritance for base test functionality

alibaba/canal

dbsync/src/test/java/com/taobao/tddl/dbsync/binlog/DirectLogFetcherTest.java

            
package com.taobao.tddl.dbsync.binlog;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import java.util.List;

import com.taobao.tddl.dbsync.binlog.event.mariadb.BinlogCheckPointLogEvent;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

import com.taobao.tddl.dbsync.binlog.event.*;
import com.taobao.tddl.dbsync.binlog.event.mariadb.AnnotateRowsEvent;

@Ignore
public class DirectLogFetcherTest extends BaseLogFetcherTest {

    @Test
    public void testSimple() {
        DirectLogFetcher fecther = new DirectLogFetcher();
        try {
            Class.forName("com.mysql.jdbc.Driver");
            Connection connection = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306", "root", "123456");
            Statement statement = connection.createStatement();
            statement.execute("SET @master_binlog_checksum='@@global.binlog_checksum'");
            statement.execute("SET @mariadb_slave_capability='" + LogEvent.MARIA_SLAVE_CAPABILITY_MINE + "'");

            fecther.open(connection, "mysql-bin.000002", 4L, 1);

            LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
            LogContext context = new LogContext();
            while (fecther.fetch()) {
                LogEvent event = decoder.decode(fecther, context);
                processEvent(event, decoder, context);
            }
        } catch (Throwable e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        } finally {
            try {
                fecther.close();
            } catch (IOException e) {
                Assert.fail(e.getMessage());
            }
        }

    }

    public void processEvent(LogEvent event, LogDecoder decoder, LogContext context) throws Throwable {
        int eventType = event.getHeader().getType();
        switch (eventType) {
            case LogEvent.ROTATE_EVENT:
                binlogFileName = ((RotateLogEvent) event).getFilename();
                break;
            case LogEvent.BINLOG_CHECKPOINT_EVENT:
                binlogFileName = ((BinlogCheckPointLogEvent) event).getFilename();
                break;
            case LogEvent.WRITE_ROWS_EVENT_V1:
            case LogEvent.WRITE_ROWS_EVENT:
                parseRowsEvent((WriteRowsLogEvent) event);
                break;
            case LogEvent.UPDATE_ROWS_EVENT_V1:
            case LogEvent.PARTIAL_UPDATE_ROWS_EVENT:
            case LogEvent.UPDATE_ROWS_EVENT:
                parseRowsEvent((UpdateRowsLogEvent) event);
                break;
            case LogEvent.DELETE_ROWS_EVENT_V1:
            case LogEvent.DELETE_ROWS_EVENT:
                parseRowsEvent((DeleteRowsLogEvent) event);
                break;
            case LogEvent.QUERY_EVENT:
                parseQueryEvent((QueryLogEvent) event);
                break;
            case LogEvent.ROWS_QUERY_LOG_EVENT:
                parseRowsQueryEvent((RowsQueryLogEvent) event);
                break;
            case LogEvent.ANNOTATE_ROWS_EVENT:
                parseAnnotateRowsEvent((AnnotateRowsEvent) event);
                break;
            case LogEvent.XID_EVENT:
                parseXidEvent((XidLogEvent) event);
                break;
            case LogEvent.TRANSACTION_PAYLOAD_EVENT:
                List<LogEvent> events = decoder.processIterateDecode(event, context);
                for (LogEvent deEvent : events) {
                    processEvent(deEvent, decoder, context);
                }
                break;
            default:
                break;
        }
    }
}