Testing Elasticsearch 6.x Client Adapter Operations in Canal
This test suite validates Elasticsearch 6.x client adapter functionality in Canal, focusing on CRUD operations and document management. It tests the integration between Canal and Elasticsearch through the TransportClient implementation.
Test Coverage Overview
Implementation Analysis
Technical Details
Best Practices Demonstrated
alibaba/canal
client-adapter/es6x/src/test/java/com/alibaba/otter/canal/client/adapter/es6x/test/ESTest.java
package com.alibaba.otter.canal.client.adapter.es6x.test;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.LinkedHashMap;
import java.util.Map;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
@Ignore
public class ESTest {
private TransportClient transportClient;
@Before
public void init() throws UnknownHostException {
Settings.Builder settingBuilder = Settings.builder();
settingBuilder.put("cluster.name", TestConstant.clusterName);
Settings settings = settingBuilder.build();
transportClient = new PreBuiltTransportClient(settings);
String[] hostArray = TestConstant.esHosts.split(",");
for (String host : hostArray) {
int i = host.indexOf(":");
transportClient.addTransportAddress(new TransportAddress(InetAddress.getByName(host.substring(0, i)),
Integer.parseInt(host.substring(i + 1))));
}
}
@Test
public void test01() {
SearchResponse response = transportClient.prepareSearch("test")
.setTypes("osm")
.setQuery(QueryBuilders.termQuery("_id", "1"))
.setSize(10000)
.get();
for (SearchHit hit : response.getHits()) {
System.out.println(hit.getSourceAsMap().get("data").getClass());
}
}
@Test
public void test02() {
Map<String, Object> esFieldData = new LinkedHashMap<>();
esFieldData.put("userId", 2L);
esFieldData.put("eventId", 4L);
esFieldData.put("eventName", "网络异常");
esFieldData.put("description", "第四个事件信息");
Map<String, Object> relations = new LinkedHashMap<>();
esFieldData.put("user_event", relations);
relations.put("name", "event");
relations.put("parent", "2");
BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
bulkRequestBuilder
.add(transportClient.prepareIndex("test", "osm", "2_4").setRouting("2").setSource(esFieldData));
commit(bulkRequestBuilder);
}
@Test
public void test03() {
Map<String, Object> esFieldData = new LinkedHashMap<>();
esFieldData.put("userId", 2L);
esFieldData.put("eventName", "网络异常1");
Map<String, Object> relations = new LinkedHashMap<>();
esFieldData.put("user_event", relations);
relations.put("name", "event");
relations.put("parent", "2");
BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
bulkRequestBuilder.add(transportClient.prepareUpdate("test", "osm", "2_4").setRouting("2").setDoc(esFieldData));
commit(bulkRequestBuilder);
}
@Test
public void test04() {
BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
bulkRequestBuilder.add(transportClient.prepareDelete("test", "osm", "2_4"));
commit(bulkRequestBuilder);
}
private void commit(BulkRequestBuilder bulkRequestBuilder) {
if (bulkRequestBuilder.numberOfActions() > 0) {
BulkResponse response = bulkRequestBuilder.execute().actionGet();
if (response.hasFailures()) {
for (BulkItemResponse itemResponse : response.getItems()) {
if (!itemResponse.isFailed()) {
continue;
}
if (itemResponse.getFailure().getStatus() == RestStatus.NOT_FOUND) {
System.out.println(itemResponse.getFailureMessage());
} else {
System.out.println("ES bulk commit error" + itemResponse.getFailureMessage());
}
}
}
}
}
@After
public void after() {
transportClient.close();
}
}