Back to Repositories

Testing Label Synchronization with Sub-Query Operations in Canal

This test suite validates the synchronization functionality between a MySQL database and Elasticsearch 6.x for label data with complex join operations and sub-queries. It focuses on testing the ES6xAdapter implementation for handling label relationships and data transformations.

Test Coverage Overview

The test suite provides comprehensive coverage of label synchronization operations with sub-queries:
  • Insert operations with function-based sub-queries
  • Update scenarios for label modifications
  • Delete operations and their impact on related data
  • Label concatenation and formatting validation

Implementation Analysis

The testing approach utilizes JUnit framework to validate the ES6xAdapter functionality. It implements a systematic pattern of setup, execution, and verification for each operation type:
  • Common initialization through @Before setup
  • Transaction simulation using DML objects
  • Direct database operations via DataSource
  • Elasticsearch response validation

Technical Details

Testing infrastructure includes:
  • JUnit 4 testing framework
  • ES6xAdapter for Elasticsearch 6.x operations
  • Custom DML implementation for data manipulation
  • DataSource configuration for database operations
  • GetResponse verification for Elasticsearch queries

Best Practices Demonstrated

The test suite exemplifies several testing best practices:
  • Isolated test cases with clear setup and teardown
  • Comprehensive assertion checks for data integrity
  • Proper exception handling and resource management
  • Clear test method naming and documentation
  • Modular test structure with reusable components

alibaba/canal

client-adapter/es6x/src/test/java/com/alibaba/otter/canal/client/adapter/es6x/test/sync/LabelSyncJoinSub2Test.java

            
package com.alibaba.otter.canal.client.adapter.es6x.test.sync;

import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import javax.sql.DataSource;

import org.elasticsearch.action.get.GetResponse;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

import com.alibaba.otter.canal.client.adapter.es.core.config.ESSyncConfig;
import com.alibaba.otter.canal.client.adapter.es6x.ES6xAdapter;
import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
import com.alibaba.otter.canal.client.adapter.support.Dml;

@Ignore
public class LabelSyncJoinSub2Test {

    private ES6xAdapter esAdapter;

    @Before
    public void init() {
        // AdapterConfigs.put("es", "mytest_user_join_sub2.yml");
        esAdapter = Common.init();
    }

    /**
     * 带函数子查询从表插入
     */
    @Test
    public void test01() {
        DataSource ds = DatasourceConfig.DATA_SOURCES.get("defaultDS");
        Common.sqlExe(ds, "delete from label where id=1 or id=2");
        Common.sqlExe(ds, "insert into label (id,user_id,label) values (1,1,'a')");
        Common.sqlExe(ds, "insert into label (id,user_id,label) values (2,1,'b')");

        Dml dml = new Dml();
        dml.setDestination("example");
        dml.setTs(new Date().getTime());
        dml.setType("INSERT");
        dml.setDatabase("mytest");
        dml.setTable("label");
        List<Map<String, Object>> dataList = new ArrayList<>();
        Map<String, Object> data = new LinkedHashMap<>();
        dataList.add(data);
        data.put("id", 2L);
        data.put("user_id", 1L);
        data.put("label", "b");
        dml.setData(dataList);

        String database = dml.getDatabase();
        String table = dml.getTable();
        Map<String, ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);

        esAdapter.getEsSyncService().sync(esSyncConfigs.values(), dml);

        GetResponse response = esAdapter.getEsConnection()
            .getTransportClient()
            .prepareGet("mytest_user", "_doc", "1")
            .get();
        Assert.assertEquals("b;a_", response.getSource().get("_labels"));
    }

    /**
     * 带函数子查询从表更新
     */
    @Test
    public void test02() {
        DataSource ds = DatasourceConfig.DATA_SOURCES.get("defaultDS");
        Common.sqlExe(ds, "update label set label='aa' where id=1");

        Dml dml = new Dml();
        dml.setDestination("example");
        dml.setTs(new Date().getTime());
        dml.setType("UPDATE");
        dml.setDatabase("mytest");
        dml.setTable("label");
        List<Map<String, Object>> dataList = new ArrayList<>();
        Map<String, Object> data = new LinkedHashMap<>();
        dataList.add(data);
        data.put("id", 1L);
        data.put("user_id", 1L);
        data.put("label", "aa");
        dml.setData(dataList);

        List<Map<String, Object>> oldList = new ArrayList<>();
        Map<String, Object> old = new LinkedHashMap<>();
        oldList.add(old);
        old.put("label", "v");
        dml.setOld(oldList);

        String database = dml.getDatabase();
        String table = dml.getTable();
        Map<String, ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);

        esAdapter.getEsSyncService().sync(esSyncConfigs.values(), dml);

        GetResponse response = esAdapter.getEsConnection()
            .getTransportClient()
            .prepareGet("mytest_user", "_doc", "1")
            .get();
        Assert.assertEquals("b;aa_", response.getSource().get("_labels"));
    }

    /**
     * 带函数子查询从表删除
     */
    @Test
    public void test03() {
        DataSource ds = DatasourceConfig.DATA_SOURCES.get("defaultDS");
        Common.sqlExe(ds, "delete from label where id=1");

        Dml dml = new Dml();
        dml.setDestination("example");
        dml.setTs(new Date().getTime());
        dml.setType("DELETE");
        dml.setDatabase("mytest");
        dml.setTable("label");
        List<Map<String, Object>> dataList = new ArrayList<>();
        Map<String, Object> data = new LinkedHashMap<>();
        dataList.add(data);
        data.put("id", 1L);
        data.put("user_id", 1L);
        data.put("label", "a");
        dml.setData(dataList);

        String database = dml.getDatabase();
        String table = dml.getTable();
        Map<String, ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);

        esAdapter.getEsSyncService().sync(esSyncConfigs.values(), dml);

        GetResponse response = esAdapter.getEsConnection()
            .getTransportClient()
            .prepareGet("mytest_user", "_doc", "1")
            .get();
        Assert.assertEquals("b_", response.getSource().get("_labels"));
    }
}