在Elasticsearch7.15.0之后,官方提供了elasticsearch-java包作为java客户端工具包,用于取代elasticsearch-rest-high-level-client,其底层依然依赖Elasticsearch Low Level REST 客户端,即elasticsearch-rest-client。
elasticsearch-java客户端的特点:
- 对象构造基于生成器模式。
- 可以使用构建器 lambda 构建嵌套对象,从而允许使用简洁且富有表现力的类似 DSL 的代码。
一、引入依赖包
maven的pom文件
<dependency><groupId>co.elastic.clients</groupId><artifactId>elasticsearch-java</artifactId><version>8.16.1</version>
</dependency>
gradle的build文件
dependencies {implementation 'co.elastic.clients:elasticsearch-java:8.16.1'
}
二、生成DSL语句样例
1、简易match_all
public static String matchAllDsl() {//MatchAllQuery matchAllQuery = MatchAllQuery.of(m -> m);MatchAllQuery matchAllQuery = new MatchAllQuery.Builder().build();SearchRequest searchRequest = SearchRequest.of(s -> s.query(matchAllQuery._toQuery()).size(10));return convert(searchRequest);
}
{"query":{"match_all":{}},"size":10}
2、sort的match_all
public static String matchAllSortDsl() {MatchAllQuery matchAllQuery = MatchAllQuery.of(m -> m);List<SortOptions> sortOptionsList = new ArrayList<>();sortOptionsList.add(SortOptions.of(s -> s.field(FieldSort.of(f -> f.field("tenantId")))));sortOptionsList.add(SortOptions.of(s -> s.field(FieldSort.of(f -> f.field("id").order(SortOrder.Desc)))));SearchRequest searchRequest = SearchRequest.of(s -> s.query(matchAllQuery._toQuery()).size(10).sort(sortOptionsList));return convert(searchRequest);
}
{"query": {"match_all": {}},"size": 10,"sort": [{"tenantId": {}}, {"id": {"order": "desc"}}]
}
3、简易match
public static String matchDsl() {MatchQuery matchQuery = MatchQuery.of(m -> m.field("tenantId").query("IDSWVV2W"));SearchRequest searchRequest = SearchRequest.of(s -> s.query(matchQuery._toQuery()).size(10));return convert(searchRequest);
}
{"query":{"match":{"tenantId":{"query":"IDSWVV2W"}}},"size":10}
4、高亮的match
public static String matchHighlightDsl() {MatchQuery matchQuery = MatchQuery.of(m -> m.field("tenantId").query("IDSWVV2W"));Highlight highlight = Highlight.of(h -> h.fields("product_name", f -> f));SearchRequest searchRequest = SearchRequest.of(s -> s.query(matchQuery._toQuery()).size(10).highlight(highlight));return convert(searchRequest);
}
{"highlight": {"fields": {"product_name": {"fragment_offset": 10}}},"query": {"match": {"tenantId": {"query": "IDSWVV2W"}}},"size": 10
}
5、source的match
public static String matchSourceDsl() {MatchQuery matchQuery = MatchQuery.of(m -> m.field("tenantId").query("IDSWVV2W"));List<SortOptions> sortOptionsList = new ArrayList<>();sortOptionsList.add(SortOptions.of(s -> s.field(FieldSort.of(f -> f.field("tenantId")))));sortOptionsList.add(SortOptions.of(s -> s.field(FieldSort.of(f -> f.field("id").order(SortOrder.Desc)))));SourceConfig sourceConfig = SourceConfig.of(s ->s.filter(f -> f.includes("id", "tenantId").excludes("operateDate")));SearchRequest searchRequest = SearchRequest.of(s -> s.query(matchQuery._toQuery()).size(10).sort(sortOptionsList).source(sourceConfig));return convert(searchRequest);
}
{"_source": {"excludes": ["operateDate"],"includes": ["id", "tenantId"]},"query": {"match": {"tenantId": {"query": "IDSWVV2W"}}},"size": 10,"sort": [{"tenantId": {}}, {"id": {"order": "desc"}}]
}
6、简易term
public static String termDsl() {TermQuery termQuery = TermQuery.of(t -> t.field("tenantId").value("IDSWVV2W"));SearchRequest searchRequest = SearchRequest.of(s -> s.query(termQuery._toQuery()).size(10));return convert(searchRequest);}
{"query":{"term":{"tenantId":{"value":"IDSWVV2W"}}},"size":10}
7、简易match_phrase
public static String matchPhraseDsl() {MatchPhraseQuery phraseQuery = MatchPhraseQuery.of(m -> m.field("tenantId").query("IDSWVV2W").slop(0).zeroTermsQuery(ZeroTermsQuery.None).boost(1f));SearchRequest searchRequest = SearchRequest.of(s -> s.query(phraseQuery._toQuery()).size(10));return convert(searchRequest);}
{"query": {"match_phrase": {"tenantId": {"boost": 1.0,"query": "IDSWVV2W","slop": 0,"zero_terms_query": "none"}}},"size": 10
}
8、简易range
public static String dateRangeDsl() {DateRangeQuery dateRangeQuery = DateRangeQuery.of(d -> d.field("createdDate").gt("1622476800000").lt("1625068799999").boost(1f));RangeQuery rangeQuery = RangeQuery.of(r -> r.date(dateRangeQuery));SearchRequest searchRequest = SearchRequest.of(s -> s.query(rangeQuery._toQuery()).size(10));return convert(searchRequest);}
{"query":{"range":{"createdDate":{"boost":1.0,"gt":"1622476800000","lt":"1625068799999"}}},"size":10}
9、简易bool
public static String queryBoolDsl() {MatchQuery matchQuery = MatchQuery.of(m -> m.field("tenantId").query("IDSWVV2W"));TermQuery termQuery = TermQuery.of(t -> t.field("ts").value(1732260046231L));BoolQuery boolQuery = BoolQuery.of(b -> b.must(matchQuery._toQuery(), termQuery._toQuery()));SearchRequest searchRequest = SearchRequest.of(s -> s.query(boolQuery._toQuery()).size(10));return convert(searchRequest);}
{"query":{"bool":{"must":[{"match":{"tenantId":{"query":"IDSWVV2W"}}},{"term":{"ts":{"value":1732260046231}}}]}},"size":10}
10、简易filter
public static String boolFilterDsl() {/*filter查询就是用于精确过滤文档,它只关注文档是否符合条件,将匹配的文档包含在结果中。他们都不进行打分、排序或相关性计算,只担心是否匹配。*/TermQuery termQuery = TermQuery.of(t -> t.field("ts").value(1732260046231L));BoolQuery boolQuery = BoolQuery.of(b -> b.filter(termQuery._toQuery()));SearchRequest searchRequest = SearchRequest.of(s -> s.query(boolQuery._toQuery()).size(10));return convert(searchRequest);}
{"query":{"bool":{"filter":[{"term":{"ts":{"value":1732260046231}}}]}},"size":10}
11、简易exist,查询字段为空的数据
public static String fieldIsEmptyDsl() {ExistsQuery existsQuery = ExistsQuery.of(e -> e.field("content"));BoolQuery boolQuery = BoolQuery.of(b -> b.mustNot(existsQuery._toQuery()));SearchRequest searchRequest = SearchRequest.of(s -> s.query(boolQuery._toQuery()).size(10));return convert(searchRequest);}
{"query":{"bool":{"must_not":[{"exists":{"field":"content"}}]}},"size":10}
12、and和or组合
public static String mustShouldDsl() {TermQuery term1 = TermQuery.of(t -> t.field("tenant_id").value("r9au1bbd"));TermQuery term2 = TermQuery.of(t -> t.field("enable").value("1"));ExistsQuery existsQuery1 = ExistsQuery.of(e -> e.field("user_id").boost(1f));MatchPhraseQuery phraseQuery = MatchPhraseQuery.of(m -> m.field("staff_name").query("neng").slop(0).zeroTermsQuery(ZeroTermsQuery.None).boost(1f));TermQuery term3 = TermQuery.of(t -> t.field("user_email").value("玄"));TermQuery term4 = TermQuery.of(t -> t.field("user_name_pinyin").value("玄"));MatchQuery matchQuery1 = MatchQuery.of(m -> m.field("user_name").query("玄"));MatchQuery matchQuery2 = MatchQuery.of(m -> m.field("staff_mobile").query("玄"));RegexpQuery regexpQuery = RegexpQuery.of(r -> r.field("staff_email").value(".*neng.*").flags("65535").maxDeterminizedStates(10000).boost(1f));SortOptions sortOptions = SortOptions.of(s -> s.field(FieldSort.of(f -> f.field("staff_creationtime"))));TrackHits trackHits = TrackHits.of(t -> t.count(2147483647));BoolQuery shouldBool = BoolQuery.of(b -> b.should(phraseQuery._toQuery(), term3._toQuery(), term4._toQuery(), matchQuery1._toQuery(), matchQuery2._toQuery(), regexpQuery._toQuery()));BoolQuery boolQuery = BoolQuery.of(b -> b.must(term1._toQuery(), term2._toQuery(), existsQuery1._toQuery(), shouldBool._toQuery()));SearchRequest searchRequest = SearchRequest.of(s -> s.query(boolQuery._toQuery()).size(10).from(0).sort(sortOptions).trackTotalHits(trackHits));return convert(searchRequest);}
{"from": 0,"query": {"bool": {"must": [{"term": {"tenant_id": {"value": "r9au1bbd"}}}, {"term": {"enable": {"value": "1"}}}, {"exists": {"boost": 1.0,"field": "user_id"}}, {"bool": {"should": [{"match_phrase": {"staff_name": {"boost": 1.0,"query": "neng","slop": 0,"zero_terms_query": "none"}}}, {"term": {"user_email": {"value": "玄"}}}, {"term": {"user_name_pinyin": {"value": "玄"}}}, {"match": {"user_name": {"query": "玄"}}}, {"match": {"staff_mobile": {"query": "玄"}}}, {"regexp": {"staff_email": {"boost": 1.0,"flags": "65535","max_determinized_states": 10000,"value": ".*neng.*"}}}]}}]}},"size": 10,"sort": [{"staff_creationtime": {}}],"track_total_hits": 2147483647
}
13、聚合-按时间统计排序
public static SearchRequest aggregationsDsl() {TermQuery term1 = TermQuery.of(t -> t.field("tenant_id").value("r9au1bbd"));DateRangeQuery dateRangeQuery = DateRangeQuery.of(d -> d.field("createdDate").gt("1622476800000").lt("1625068799999").boost(1f));RangeQuery rangeQuery = RangeQuery.of(r -> r.date(dateRangeQuery));BoolQuery boolQuery = BoolQuery.of(b -> b.must(term1._toQuery(), rangeQuery._toQuery()).boost(1f));SortOptions sortOptions = SortOptions.of(s -> s.field(FieldSort.of(f -> f.field("createdDate"))));Aggregation valueCountAgg = Aggregation.of(a -> a.valueCount(v -> v.field("id")));NamedValue<SortOrder> namedValue = new NamedValue<>("_key", SortOrder.Asc);Aggregation dateHistogramAgg = Aggregation.of(a -> a.dateHistogram(d -> d.field("createdDate").format("yyyy-MM-dd").calendarInterval(CalendarInterval.Day).timeZone("+08:00").offset(o -> o.offset(0)).keyed(false).minDocCount(0).order(Collections.singletonList(namedValue))));SearchRequest searchRequest = SearchRequest.of(s -> s.query(boolQuery._toQuery()).size(0).sort(sortOptions).aggregations("dateList",Aggregation.of(a -> a.dateHistogram(dateHistogramAgg.dateHistogram()).aggregations("countNum", valueCountAgg))));return searchRequest;}
{"aggregations": {"dateList": {"aggregations": {"countNum": {"value_count": {"field": "id"}}},"date_histogram": {"calendar_interval": "day","field": "createdDate","format": "yyyy-MM-dd","min_doc_count": 0,"offset": 0,"order": [{"_key": "asc"}],"time_zone": "+08:00","keyed": false}}},"query": {"bool": {"boost": 1.0,"must": [{"term": {"tenant_id": {"value": "r9au1bbd"}}}, {"range": {"createdDate": {"boost": 1.0,"gt": "1622476800000","lt": "1625068799999"}}}]}},"size": 0,"sort": [{"createdDate": {}}]
}
#获取SearchRequest的dsl语句
private static String convert(SearchRequest request) {// JacksonJsonpMapper mapper = new JacksonJsonpMapper();String string = JsonpUtils.toJsonString(request, SimpleJsonpMapper.INSTANCE);System.out.println(string);return string;
}
三、索引API操作
1、查询索引数据
public static void searchTest() {ClusterInfo clusterInfo = new ClusterInfo();clusterInfo.setHosts("172.20.10.14:9200");clusterInfo.setUserName("admin");clusterInfo.setPwd("admin");ElasticsearchClient client = ElasticsearchJavaClient.buildClient(clusterInfo);try {MatchAllQuery matchAllQuery = MatchAllQuery.of(m -> m);SearchRequest searchRequest = SearchRequest.of(s -> s.query(matchAllQuery._toQuery()).size(10).index("logs-2020-01-01"));SearchResponse<LogsDocEntity> response = client.search(searchRequest, LogsDocEntity.class);HitsMetadata<LogsDocEntity> hits = response.hits();for (Hit<LogsDocEntity> hit : hits.hits()) {LogsDocEntity source = hit.source();System.out.println(source.getTimestamp());}} catch (IOException e) {throw new RuntimeException(e);}}
#生成ElasticsearchClient的实现类
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import com.yonyou.iuap.searchclient.pojo.ClusterInfo;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;import java.io.IOException;public class ElasticsearchJavaClient {public static ElasticsearchClient buildClient(ClusterInfo clusterInfo) {String hosts = clusterInfo.getHosts();String userName = clusterInfo.getUserName();String pwdStr = clusterInfo.getPwd();boolean credentialsFlag = StringUtils.isNoneBlank(userName) && StringUtils.isNoneBlank(pwdStr);final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();if (credentialsFlag) {credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, pwdStr));}RestClientBuilder clientBuilder = RestClient.builder(generateHostArray(hosts, "http"));final int[] connectTimeout = {10000};final int[] socketTimeout = {30000};final int[] connectionRequestTimeout = {10000};// 默认保持30分钟final int[] keepAliveMs = {1800000};// 异步httpclient连接延时配置clientBuilder.setRequestConfigCallback(requestConfigBuilder -> {requestConfigBuilder.setConnectTimeout(connectTimeout[0]);requestConfigBuilder.setSocketTimeout(socketTimeout[0]);requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeout[0]);return requestConfigBuilder;});// 异步httpclient配置clientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {if (credentialsFlag) {httpClientBuilder.disableAuthCaching();httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);}// httpclient保活策略httpClientBuilder.setKeepAliveStrategy((resp, context) -> keepAliveMs[0]);return httpClientBuilder;});RestClient client = clientBuilder.build();if (client.isRunning()) {ElasticsearchTransport transport = new RestClientTransport(client, new JacksonJsonpMapper());return new ElasticsearchClient(transport);}throw new RuntimeException("Cluster unable to connect!!!");}public static void closeClient(ElasticsearchClient restClient) {try {if (restClient != null && restClient.ping().value()) {restClient.close();}} catch (IOException e) {throw new RuntimeException(e);}}public static HttpHost[] generateHostArray(String hosts, String scheme) {String[] hostsArray = hosts.split(",");HttpHost[] httpHostArray = new HttpHost[hostsArray.length];for (int i = 0; i < hostsArray.length; i++) {
// String[] hp = hostsArray[i].split(CommonConstants.COLON_STR);String[] hp = getIpAndPort(false, hostsArray[i]);String host = null, port = null;if (hp.length == 2) {host = hp[0];port = hp[1];} else if (hp.length == 1) {host = hp[0];port = "9200";}httpHostArray[i] = new HttpHost(host, Integer.parseInt(port), scheme);}return httpHostArray;}private static String[] getIpAndPort(boolean isIpv6, String address) {String[] ippt = address.split(":");if (isIpv6) {String[] stv6 = new String[2];stv6[1] = ippt[ippt.length - 1];stv6[0] = address.substring(0, address.length() - stv6[1].length() - 1);return stv6;} else {return ippt;}}}
待更新。。。