Back to Repositories

Testing Elasticsearch 6.x Client Adapter Operations in Canal

This test suite validates Elasticsearch 6.x client adapter functionality in Canal, focusing on CRUD operations and document management. It tests the integration between Canal and Elasticsearch through the TransportClient implementation.

Test Coverage Overview

The test suite provides comprehensive coverage of Elasticsearch operations including:

  • Document search and retrieval operations
  • Bulk document insertion with routing
  • Document updates with partial field modifications
  • Document deletion operations
  • Error handling and response validation

Implementation Analysis

The testing approach utilizes JUnit 4 framework with a TransportClient connection to Elasticsearch. Each test case isolates specific ES operations, using @Before and @After annotations for proper resource management and cleanup. The implementation demonstrates proper connection handling and bulk operation patterns.

Technical Details

Key technical components include:

  • Elasticsearch 6.x TransportClient
  • JUnit 4 testing framework
  • Custom TestConstant configuration
  • BulkRequestBuilder for batch operations
  • SearchResponse and BulkResponse handlers

Best Practices Demonstrated

The test suite exemplifies several testing best practices:

  • Proper test isolation and resource cleanup
  • Structured error handling and validation
  • Consistent use of bulk operations for performance
  • Clear separation of setup, execution, and teardown phases

alibaba/canal

client-adapter/es6x/src/test/java/com/alibaba/otter/canal/client/adapter/es6x/test/ESTest.java

            
package com.alibaba.otter.canal.client.adapter.es6x.test;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.LinkedHashMap;
import java.util.Map;

import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

@Ignore
public class ESTest {

    private TransportClient transportClient;

    @Before
    public void init() throws UnknownHostException {
        Settings.Builder settingBuilder = Settings.builder();
        settingBuilder.put("cluster.name", TestConstant.clusterName);
        Settings settings = settingBuilder.build();
        transportClient = new PreBuiltTransportClient(settings);
        String[] hostArray = TestConstant.esHosts.split(",");
        for (String host : hostArray) {
            int i = host.indexOf(":");
            transportClient.addTransportAddress(new TransportAddress(InetAddress.getByName(host.substring(0, i)),
                Integer.parseInt(host.substring(i + 1))));
        }
    }

    @Test
    public void test01() {
        SearchResponse response = transportClient.prepareSearch("test")
            .setTypes("osm")
            .setQuery(QueryBuilders.termQuery("_id", "1"))
            .setSize(10000)
            .get();
        for (SearchHit hit : response.getHits()) {
            System.out.println(hit.getSourceAsMap().get("data").getClass());
        }
    }

    @Test
    public void test02() {
        Map<String, Object> esFieldData = new LinkedHashMap<>();
        esFieldData.put("userId", 2L);
        esFieldData.put("eventId", 4L);
        esFieldData.put("eventName", "网络异常");
        esFieldData.put("description", "第四个事件信息");

        Map<String, Object> relations = new LinkedHashMap<>();
        esFieldData.put("user_event", relations);
        relations.put("name", "event");
        relations.put("parent", "2");

        BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
        bulkRequestBuilder
            .add(transportClient.prepareIndex("test", "osm", "2_4").setRouting("2").setSource(esFieldData));
        commit(bulkRequestBuilder);
    }

    @Test
    public void test03() {
        Map<String, Object> esFieldData = new LinkedHashMap<>();
        esFieldData.put("userId", 2L);
        esFieldData.put("eventName", "网络异常1");

        Map<String, Object> relations = new LinkedHashMap<>();
        esFieldData.put("user_event", relations);
        relations.put("name", "event");
        relations.put("parent", "2");

        BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
        bulkRequestBuilder.add(transportClient.prepareUpdate("test", "osm", "2_4").setRouting("2").setDoc(esFieldData));
        commit(bulkRequestBuilder);
    }

    @Test
    public void test04() {
        BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
        bulkRequestBuilder.add(transportClient.prepareDelete("test", "osm", "2_4"));
        commit(bulkRequestBuilder);
    }

    private void commit(BulkRequestBuilder bulkRequestBuilder) {
        if (bulkRequestBuilder.numberOfActions() > 0) {
            BulkResponse response = bulkRequestBuilder.execute().actionGet();
            if (response.hasFailures()) {
                for (BulkItemResponse itemResponse : response.getItems()) {
                    if (!itemResponse.isFailed()) {
                        continue;
                    }

                    if (itemResponse.getFailure().getStatus() == RestStatus.NOT_FOUND) {
                        System.out.println(itemResponse.getFailureMessage());
                    } else {
                        System.out.println("ES bulk commit error" + itemResponse.getFailureMessage());
                    }
                }
            }
        }
    }

    @After
    public void after() {
        transportClient.close();
    }
}