Back to Repositories

Testing Local Binlog Event Processing in Canal

This test suite validates the local binary log parsing functionality in Canal, focusing on processing MySQL binlog events from a local directory. It implements comprehensive testing of event parsing, transaction handling, and data change capture capabilities.

Test Coverage Overview

The test suite provides coverage for local binlog event parsing with specific focus on:

  • Binary log file reading and parsing
  • Transaction begin/end event handling
  • Row-level change detection (INSERT, UPDATE, DELETE)
  • Column value and metadata extraction
  • Position tracking and persistence

Implementation Analysis

The testing approach utilizes JUnit framework to validate the LocalBinlogEventParser component. It implements a mock event sink and position manager to verify proper event processing and position tracking.

Key patterns include:
  • Event sink implementation for capturing and validating parsed events
  • Custom position management for tracking binlog positions
  • Charset and authentication configuration testing

Technical Details

Testing infrastructure includes:

  • JUnit 4 testing framework
  • Mock implementations of CanalEventSink and LogPositionManager
  • Local binlog file resources in test directory
  • UTF-8 charset configuration
  • Simulated MySQL authentication credentials

Best Practices Demonstrated

The test implementation showcases several testing best practices:

  • Comprehensive event type handling validation
  • Proper resource cleanup and controller lifecycle management
  • Clear separation of setup, execution, and verification phases
  • Detailed logging and error handling verification
  • Mock component integration for isolated testing

alibaba/canal

parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinlogDumpTest.java

            
package com.alibaba.otter.canal.parse.inbound.mysql;

import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.util.List;

import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

import com.alibaba.otter.canal.parse.exception.CanalParseException;
import com.alibaba.otter.canal.parse.index.AbstractLogPositionManager;
import com.alibaba.otter.canal.parse.stub.AbstractCanalEventSinkTest;
import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.position.EntryPosition;
import com.alibaba.otter.canal.protocol.position.LogPosition;
import com.alibaba.otter.canal.sink.exception.CanalSinkException;
@Ignore
public class LocalBinlogDumpTest {

    @Test
    public void testSimple() {
        String directory = "/Users/wanshao/projects/canal/parse/src/test/resources/binlog/tsdb";
        final LocalBinlogEventParser controller = new LocalBinlogEventParser();
        final EntryPosition startPosition = new EntryPosition("mysql-bin.000003", 123L);

        controller.setMasterInfo(new AuthenticationInfo(new InetSocketAddress("127.0.0.1", 3306), "canal", "canal"));
        controller.setConnectionCharsetStd(Charset.forName("UTF-8"));
        controller.setDirectory(directory);
        controller.setMasterPosition(startPosition);
        controller.setEventSink(new AbstractCanalEventSinkTest<List<Entry>>() {

            public boolean sink(List<Entry> entrys, InetSocketAddress remoteAddress, String destination)
                                                                                                        throws CanalSinkException,
                                                                                                        InterruptedException {

                for (Entry entry : entrys) {
                    if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN
                        || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                        continue;
                    }

                    if (entry.getEntryType() == EntryType.ROWDATA) {
                        RowChange rowChange = null;
                        try {
                            rowChange = RowChange.parseFrom(entry.getStoreValue());
                        } catch (Exception e) {
                            throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:"
                                                       + entry.toString(), e);
                        }

                        EventType eventType = rowChange.getEventType();
                        System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                            entry.getHeader().getLogfileName(),
                            entry.getHeader().getLogfileOffset(),
                            entry.getHeader().getSchemaName(),
                            entry.getHeader().getTableName(),
                            eventType));

                        for (RowData rowData : rowChange.getRowDatasList()) {
                            if (eventType == EventType.DELETE) {
                                print(rowData.getBeforeColumnsList());
                            } else if (eventType == EventType.INSERT) {
                                print(rowData.getAfterColumnsList());
                            } else {
                                System.out.println("-------> before");
                                print(rowData.getBeforeColumnsList());
                                System.out.println("-------> after");
                                print(rowData.getAfterColumnsList());
                            }
                        }
                    }
                }

                return true;
            }

        });
        controller.setLogPositionManager(new AbstractLogPositionManager() {

            @Override
            public LogPosition getLatestIndexBy(String destination) {
                return null;
            }

            @Override
            public void persistLogPosition(String destination, LogPosition logPosition) throws CanalParseException {
                System.out.println(logPosition);
            }
        });

        controller.start();

        try {
            Thread.sleep(100 * 1000L);
        } catch (InterruptedException e) {
            Assert.fail(e.getMessage());
        }
        controller.stop();
    }

    private void print(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}