Back to Repositories

Testing RDS Binlog Event Parsing Implementation in Alibaba Canal

This test suite validates the RdsBinlogEventParserProxy functionality in Alibaba Canal, focusing on binlog event parsing and timestamp handling for RDS MySQL instances. It ensures proper event detection, filtering, and position management for database change events.

Test Coverage Overview

The test suite provides comprehensive coverage of RDS binlog event parsing functionality:

  • Timestamp validation and event position tracking
  • Authentication and connection handling
  • Event filtering and blacklist configuration
  • Batch processing and file size management

Implementation Analysis

The testing approach utilizes JUnit framework with mock components and time-based validation:

  • Implements TimeoutChecker for execution control
  • Uses AtomicLong for thread-safe event counting
  • Employs custom event sink implementation for validation
  • Implements position management through AbstractLogPositionManager

Technical Details

Key technical components and configurations:

  • JUnit 4 testing framework
  • Custom TimeoutChecker implementation
  • AviaterRegexFilter for event filtering
  • Mock authentication and position management
  • Configurable batch processing parameters

Best Practices Demonstrated

The test implementation showcases several testing best practices:

  • Proper test isolation and cleanup
  • Comprehensive assertion validation
  • Clear separation of setup and verification logic
  • Effective use of helper methods for common operations
  • Proper resource management and cleanup

alibaba/canal

parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/RdsBinlogEventParserProxyTest.java

            
package com.alibaba.otter.canal.parse.inbound.mysql;

import java.net.InetSocketAddress;
import java.util.Calendar;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

import com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter;
import com.alibaba.otter.canal.parse.helper.TimeoutChecker;
import com.alibaba.otter.canal.parse.inbound.mysql.rds.RdsBinlogEventParserProxy;
import com.alibaba.otter.canal.parse.index.AbstractLogPositionManager;
import com.alibaba.otter.canal.parse.stub.AbstractCanalEventSinkTest;
import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.position.EntryPosition;
import com.alibaba.otter.canal.protocol.position.LogPosition;
import com.alibaba.otter.canal.sink.exception.CanalSinkException;

/**
 * @author chengjin.lyf on 2018/7/21 下午5:24
 * @since 1.0.25
 */
@Ignore
public class RdsBinlogEventParserProxyTest {

    private static final String DETECTING_SQL = "insert into retl.xdual values(1,now()) on duplicate key update x=now()";
    private static final String MYSQL_ADDRESS = "";
    private static final String USERNAME      = "";
    private static final String PASSWORD      = "";
    public static final String  DBNAME        = "";
    public static final String  TBNAME        = "";
    public static final String  DDL           = "";

    @Test
    public void test_timestamp() throws InterruptedException {
        final TimeoutChecker timeoutChecker = new TimeoutChecker(3000 * 1000);
        final AtomicLong entryCount = new AtomicLong(0);
        final EntryPosition entryPosition = new EntryPosition();

        final RdsBinlogEventParserProxy controller = new RdsBinlogEventParserProxy();
        Calendar calendar = Calendar.getInstance();
        calendar.add(Calendar.HOUR_OF_DAY, -24 * 4);
        final EntryPosition defaultPosition = buildPosition(null, null, calendar.getTimeInMillis());
        controller.setSlaveId(3344L);
        controller.setDetectingEnable(false);
        controller.setDetectingSQL(DETECTING_SQL);
        controller.setMasterInfo(buildAuthentication());
        controller.setMasterPosition(defaultPosition);
        controller.setInstanceId("");
        controller.setAccesskey("");
        controller.setSecretkey("");
        controller.setDirectory("/tmp/binlog");
        controller.setEventBlackFilter(new AviaterRegexFilter("mysql\\.*"));
        controller.setFilterTableError(true);
        controller.setBatchFileSize(4);
        controller.setEventSink(new AbstractCanalEventSinkTest<List<CanalEntry.Entry>>() {

            @Override
            public boolean sink(List<CanalEntry.Entry> entrys, InetSocketAddress remoteAddress, String destination)
                                                                                                                   throws CanalSinkException {
                for (CanalEntry.Entry entry : entrys) {
                    if (entry.getEntryType() != CanalEntry.EntryType.HEARTBEAT) {
                        entryCount.incrementAndGet();

                        String logfilename = entry.getHeader().getLogfileName();
                        long logfileoffset = entry.getHeader().getLogfileOffset();
                        long executeTime = entry.getHeader().getExecuteTime();

                        entryPosition.setJournalName(logfilename);
                        entryPosition.setPosition(logfileoffset);
                        entryPosition.setTimestamp(executeTime);
                        break;
                    }
                }
                return true;
            }
        });

        controller.setLogPositionManager(new AbstractLogPositionManager() {

            private LogPosition logPosition;

            public void persistLogPosition(String destination, LogPosition logPosition) {
                this.logPosition = logPosition;
            }

            public LogPosition getLatestIndexBy(String destination) {
                return logPosition;
            }
        });

        controller.start();
        timeoutChecker.waitForIdle();

        if (controller.isStart()) {
            controller.stop();
        }

        // check
        Assert.assertTrue(entryCount.get() > 0);

        // 对比第一条数据和起始的position相同
        Assert.assertEquals(entryPosition.getJournalName(), "mysql-bin.000001");
        Assert.assertTrue(entryPosition.getPosition() <= 6163L);
        Assert.assertTrue(entryPosition.getTimestamp() <= defaultPosition.getTimestamp());
    }

    // ======================== helper method =======================

    private EntryPosition buildPosition(String binlogFile, Long offest, Long timestamp) {
        return new EntryPosition(binlogFile, offest, timestamp);
    }

    private AuthenticationInfo buildAuthentication() {
        return new AuthenticationInfo(new InetSocketAddress(MYSQL_ADDRESS, 3306), USERNAME, PASSWORD);
    }
}