Back to Repositories

Testing EventTransactionBuffer Implementation in Alibaba Canal

This test suite validates the EventTransactionBuffer functionality in Alibaba Canal, focusing on transaction buffering and flushing mechanisms. The tests ensure proper handling of database transaction events and buffer management within the Canal parsing system.

Test Coverage Overview

The test suite provides comprehensive coverage of the EventTransactionBuffer component:

  • Transaction flush behavior with configurable buffer sizes
  • Force flush functionality when buffer limit is reached
  • Transaction begin/end event handling
  • Entry header processing and timestamp management

Implementation Analysis

The testing approach utilizes JUnit to validate buffer operations:

Implements systematic verification of buffer mechanics through two main test cases – testTransactionFlush() and testForceFlush(). Uses mock Entry objects with configurable parameters to simulate real database transaction events.

  • Transaction size verification
  • Buffer capacity management
  • Callback execution validation

Technical Details

  • JUnit 4 testing framework
  • Custom Entry builder methods for test data generation
  • MessageFormat for log message formatting
  • SimpleDateFormat for timestamp handling
  • Assert statements for validation

Best Practices Demonstrated

The test implementation showcases several testing best practices:

  • Isolated test methods with clear responsibilities
  • Proper setup and teardown of buffer resources
  • Comprehensive error handling with Assert.fail()
  • Consistent test data generation patterns
  • Clear separation of test scenarios

alibaba/canal

parse/src/test/java/com/alibaba/otter/canal/parse/inbound/EventTransactionBufferTest.java

            
package com.alibaba.otter.canal.parse.inbound;

import java.text.MessageFormat;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.junit.Assert;
import org.junit.Test;

import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.Header;

public class EventTransactionBufferTest {

    private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
    private static final String messgae     = "{0} [{1}:{2}:{3}] {4}.{5}";

    @Test
    public void testTransactionFlush() {
        final int bufferSize = 64;
        final int transactionSize = 5;
        EventTransactionBuffer buffer = new EventTransactionBuffer();
        buffer.setBufferSize(bufferSize);
        buffer.setFlushCallback(transaction -> {
            Assert.assertEquals(transactionSize, transaction.size());
            System.out.println("\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
            for (Entry data : transaction) {

                Header header = data.getHeader();
                Date date = new Date(header.getExecuteTime());
                SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
                if (data.getEntryType() == EntryType.TRANSACTIONBEGIN
                    || data.getEntryType() == EntryType.TRANSACTIONEND) {
                    System.out.println(data.getEntryType());

                } else {
                    System.out.println(MessageFormat.format(messgae, new Object[] {
                            Thread.currentThread().getName(), header.getLogfileName(), header.getLogfileOffset(),
                            format.format(date), header.getSchemaName(), header.getTableName() }));
                }

            }
            System.out.println("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n");
        });
        buffer.start();

        try {
            for (int i = 0; i < transactionSize * 10; i++) {
                if (i % transactionSize == 0) {
                    buffer.add(buildEntry("1", 1L + i, 40L + i, EntryType.TRANSACTIONBEGIN));
                } else if ((i + 1) % transactionSize == 0) {
                    buffer.add(buildEntry("1", 1L + i, 40L + i, EntryType.TRANSACTIONEND));
                } else {
                    buffer.add(buildEntry("1", 1L + i, 40L + i));
                }
            }
        } catch (InterruptedException e) {
            Assert.fail(e.getMessage());
        }

        buffer.stop();
    }

    @Test
    public void testForceFlush() {
        final int bufferSize = 64;
        EventTransactionBuffer buffer = new EventTransactionBuffer();
        buffer.setBufferSize(bufferSize);
        buffer.setFlushCallback(transaction -> {
            Assert.assertEquals(bufferSize, transaction.size());
            System.out.println("\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
            for (Entry data : transaction) {

                Header header = data.getHeader();
                Date date = new Date(header.getExecuteTime());
                SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
                if (data.getEntryType() == EntryType.TRANSACTIONBEGIN
                    || data.getEntryType() == EntryType.TRANSACTIONEND) {
                    // System.out.println(MessageFormat.format(messgae, new
                    // Object[] {
                    // Thread.currentThread().getName(),
                    // header.getLogfilename(), header.getLogfileoffset(),
                    // format.format(date),
                    // data.getEntry().getEntryType(), "" }));
                    System.out.println(data.getEntryType());

                } else {
                    System.out.println(MessageFormat.format(messgae, new Object[] {
                            Thread.currentThread().getName(), header.getLogfileName(), header.getLogfileOffset(),
                            format.format(date), header.getSchemaName(), header.getTableName() }));
                }

            }
            System.out.println("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n");
        });
        buffer.start();

        try {
            for (int i = 0; i < bufferSize * 2 + 1; i++) {
                buffer.add(buildEntry("1", 1L + i, 40L + i));
            }
        } catch (InterruptedException e) {
            Assert.fail(e.getMessage());
        }

        buffer.stop();
    }

    private static Entry buildEntry(String binlogFile, long offset, long timestamp) {
        Header.Builder headerBuilder = Header.newBuilder();
        headerBuilder.setLogfileName(binlogFile);
        headerBuilder.setLogfileOffset(offset);
        headerBuilder.setExecuteTime(timestamp);
        Entry.Builder entryBuilder = Entry.newBuilder();
        entryBuilder.setHeader(headerBuilder.build());
        return entryBuilder.build();
    }

    private static Entry buildEntry(String binlogFile, long offset, long timestamp, EntryType type) {
        Header.Builder headerBuilder = Header.newBuilder();
        headerBuilder.setLogfileName(binlogFile);
        headerBuilder.setLogfileOffset(offset);
        headerBuilder.setExecuteTime(timestamp);
        Entry.Builder entryBuilder = Entry.newBuilder();
        entryBuilder.setHeader(headerBuilder.build());
        entryBuilder.setEntryType(type);
        return entryBuilder.build();
    }
}