一、数据聚合
1. 概述
聚合(aggregations)可以实现对文档数据的统计、分析、运算。聚合常见有三类:
桶(Bcket)聚合:用来对文档做分组
- TermAggregation: 按照文档字段值分组
- Date Histogram: 按照日期阶梯分组,例如一周一组,或者一月为一组
度量(Metric)聚合:用以计算一些值,比如:最大值、最小值、平均值等
- Avg:求平均值
- Max:求最大值
- Min:求最小值
- Stats:同时求max、min、avg、sum 等
管道(pipeline)聚合:结果为基础做聚合
注意:参与聚合的字段必须是:keyword、数值、日期、布尔
2. Bucket聚合
2.1 DSL 实现 Bucket聚合
统计所有数据中的酒店品牌有几种,此时可以根据酒店的名称做聚合。类型为 term类型。DSL实例如下。
GET /hotel/_search
{
"size": 0, // 设置size为0,结果中不包含文档,只包含聚合结果
"aggs": { // 定义聚合
"brandAgg": { //给聚合起个名字
"terms": { // 聚合的类型,按照品牌值聚合,所以选择term
"field": "brand", // 参与聚合的字段
"size": 20 // 希望获取的聚合结果数量
}
}
}
}
2.2 Bucket聚合-聚合结果排序
默认情况下,Bucket聚合会统计Bucket内的文档数量,记为_count,并且按照_count降序排序。我们可以修改结果排序方式:
GET /hotel/_search
{
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"order": {
"_count": "asc" // 按照_count升序排列
},
"size": 20
}
}
}
}
2.3 Bucket聚合-限定聚合范围
默认情况下,Bucket聚合是对索引库的所有文档做聚合,我们可以限定要聚合的文档范围,只要添加query条件即可
GET /hotel/_search
{
"query": {
"range": {
"price": {
"lte": 200 // 只对200元以下的文档聚合
}
}
},
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"size": 20
}
}
}
}
2.4 小结
aggs代表聚合,与query同级,此时query的作用是 限定聚合的的文档范围
聚合必须的三要素:聚合名称、聚合类型、聚合字段
聚合可配置属性有:
- size:指定聚合结果数量
- order:指定聚合结果排序方式
- field:指定聚合字段
3. Metrics 聚合
3.1 DSL实现 Metrics 聚合
获取每个品牌的用户评分的min、max、avg等值。我们可以利用 stats 聚合:
GET /hotel/_search
{
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"size": 20
},
"aggs": { // 是brands聚合的子聚合,也就是分组后对每组分别计算
"score_stats": { // 聚合名称
"stats": { // 聚合类型,这里stats可以计算min、max、avg等
"field": "score" // 聚合字段,这里是score
}
}
}
}
}
}
4. RestAPI 实现聚合
以品牌聚合为例,演示下Java的RestClient使用,下图是DSL 语句和 java代码的对应关系。
请求组装
java 代码如下
@Testvoid testAggregation() throws IOException {// 1. 准备RequestSearchRequest request = new SearchRequest("hotel");// 2. 准备 DSL// 2.1 设置 sizerequest.source().size(0);//2.2 聚合request.source().aggregation(AggregationBuilders.terms("brandAgg").field("brand").size(10));// 3 发出请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);// 4. 解析结果System.out.println(response);}
得到的结果是个 JSON 格式的数据,下面就要对聚合结果解析
聚合结果解析
java 代码块
// 4. 解析结果Aggregations aggregations = response.getAggregations();// 4.1 根据集合名称获取聚合结果Terms brandTerms = aggregations.get("brandAgg");// 4.2 获取buckets List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();// 4.3 遍历for (Terms.Bucket bucket : buckets) {// 获取 keyString key = bucket.getKeyAsString();System.out.println(key);}
整体代码
public class HotelSearchTest {private RestHighLevelClient client;// 客户端初始化@BeforeEachvoid setUp() {this.client = new RestHighLevelClient(RestClient.builder(HttpHost.create("http://192.168.30.130:9200") // 服务器IP + ES 端口));}@Testvoid testAggregation() throws IOException {// 1. 准备RequestSearchRequest request = new SearchRequest("hotel");// 2. 准备 DSL// 2.1 设置 sizerequest.source().size(0);//2.2 聚合request.source().aggregation(AggregationBuilders.terms("brandAgg").field("brand").size(10));// 3 发出请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);// 4. 解析结果Aggregations aggregations = response.getAggregations();// 4.1 根据集合名称获取聚合结果Terms brandTerms = aggregations.get("brandAgg");// 4.2 获取buckets List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();// 4.3 遍历for (Terms.Bucket bucket : buckets) {// 获取 keyString key = bucket.getKeyAsString();System.out.println(key);}}@AfterEachvoid tearDown() throws IOException {this.client.close();}}
结果
二、自动补全
在搜索框输入字符时,我们应该提示出与该字符有关的搜索项,如电商网站搜索
1 安装拼音分词插件
要实现根据字母做补全,就必须对文档按照拼音分词。在GitHub上恰好有elasticsearch的拼音分词插件。地址 https://github.com/medcl/elasticsearch-analysis-pinyin
安装方式与IK分词器一样,分三步:
- 解压
- 上传到虚拟机中,elasticsearch的plugin目录
- 重启elasticsearch
- 测试
POST /_analyze
{
"text": "如家酒店", // 分词内容
"analyzer": "pinyin" // 哪种分词器
}
如果是用的汉字,则使用的是ik分词器;拼音分词器使用的是pinyin分词器。
2 自定义分词器
elasticsearch中分词器(analyzer)的组成包含三部分:
- character filters(字符过滤器):分词之前对词条进行处理。例如删除字符、替换字符。
- tokenizer(分词器):指定分词器。
- tokenizer filter(过滤器):对分词后的词条进行处理。例如大小写转换、同义词处理、拼音处理等。
在创建索引库时,通过settings来配置自定义的analyzer(分词器),自定义分词器只能在当前索引下使用
PUT /test // 请求路径,创建一个名为 test 的索引
{
"settings": { // 设置索引的参数,用于配置索引的分析器和过滤器
"analysis": { // 分析器和过滤器的设置
"analyzer": { // 分析器的设置,可以自定义分词器名称
"my_analyzer": { // 自定义分词器名称
"tokenizer": "ik_max_word", // 分析器使用的分词器,这里使用IK分词器,可以将中文切分成单个单词。
"filter": "py" // 定义过滤器,这里定义一个名为py的过滤器
}
},
"filter": { // 过滤器的配置
"py": { // 过滤器名称,可自定义
"type": "pinyin", // 过滤器类型,这里是pinyin,表示把中文转换成拼音
"keep_full_pinyin": false, // 拼音转换后是否保留完整拼音之间的设置
"keep_joined_full_pinyin": true, // 拼音转换后是否保留完整拼音之间连字符的设置
"keep_original": true, // 是否保留原始文本的设置
"limit_first_letter_length": 16, // 拼音转换后首字母的长度限制
"remove_duplicated_term": true, // 是否去除重复结果的设置
"none_chinese_pinyin_tokenize": false // 是否对非中文文本进行拼音切分的设置
}
}
}
}
}
如果想在其他地方使用自定义分词器,需要指定自定义索引名。
POST /test/_analyze // test是自定义的索引名
{
"text": ["如家酒店还不错"],
"analyzer": "my_analyzer" // 自定义分词器
}
拼音分词器适合在创建倒排索引的时候使用,但不能在搜索的时候使用。因为当查询的时候,会把所有同音字的数据都返回。
解决方案:创建索引时,用自定义分词器,搜索时用ik分词器。
PUT /test
{"settings": {"analysis": {"analyzer": { "my_analyzer": { "tokenizer": "ik_max_word","filter": "py"}},"filter": {"py": { "type": "pinyin","keep_full_pinyin": false,"keep_joined_full_pinyin": true,"keep_original": true,"limit_first_letter_length": 16,"remove_duplicated_term": true,"none_chinese_pinyin_tokenize": false}}}},"mappings": {"properties": {"name":{"type": "text","analyzer": "my_analyzer","search_analyzer": "ik_smart"}}}
}
3. 自动补全
elasticsearch提供了Completion Suggester查询来实现自动补全功能。这个查询会匹配以用户输入内容开头的词条并返回。为了提高补全查询的效率,对于文档中字段的类型必须是 completion 类型。字段里内容就是补全的多个词条。
3.1 completion suggest 查询补全数据
// 创建索引库
PUT 索引库名{
"mappings": {
"properties": {
"title":{
"type": "completion"
}
}
}
}
示例数据
// 示例数据
POST test/_doc
{
"title": ["Sony", "WH-1000XM3"]
}
POST test/_doc
{
"title": ["SK-II", "PITERA"]
}
POST test/_doc
{
"title": ["Nintendo", "switch"]
}
自动补全查询语法
// 自动补全查询
GET /索引库名/_search
{
"suggest": { // 实现搜索建议功能
"title_suggest": { // 建议查询的名称,可以自定义
"text": "s", // 用户输入的搜素内容,这里表示用户输入的内容是 “s"
"completion": { // 建议查询的类型,表示 通过补全的方式提供搜索建议
"field": "title", // 对哪个字段名称进行补全建议,表示 对 title 字段进行建议
"skip_duplicates": true, // 可选参数,是否跳过重复的结果
"size": 10 // 可选参数,获取前10条结果
}
}
}
}
3.2 RestAPI 自动补全
来看一下 java 代码 与 DSL 对应关系。
发送请求代码片段
@Testvoid testSuggest() throws IOException {// 1. 准备RequestSearchRequest request = new SearchRequest("hotel");// 2. 准备 DSL request.source().suggest(new SuggestBuilder().addSuggestion("suggestions",SuggestBuilders.completionSuggestion("suggestion").prefix("h").skipDuplicates(true).size(10)));// 3. 发送请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);// 4. 解析结果}
结果解析代码片段
结果解析代码片段
// 4. 解析结果Suggest suggest = response.getSuggest();// 4.1 根据名称获取补全结果, suggest.getSuggestion 的name 属性是 DSL 语句定义的名称CompletionSuggestion suggestions = suggest.getSuggestion("suggestions");// 4.2 获取 optionsList<CompletionSuggestion.Entry.Option> options = suggestions.getOptions();// 4.3 遍历for (CompletionSuggestion.Entry.Option option : options) {String text = option.getText().toString();System.out.println(text);}
完整代码
package cn.itcast.hotel;import cn.itcast.hotel.pojo.HotelDoc;
import com.alibaba.fastjson.JSON;
import org.apache.http.HttpHost;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.search.suggest.SuggestBuilders;
import org.elasticsearch.search.suggest.completion.CompletionSuggester;
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.util.CollectionUtils;import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;public class HotelSearchTest {private RestHighLevelClient client;// 客户端初始化@BeforeEachvoid setUp() {this.client = new RestHighLevelClient(RestClient.builder(HttpHost.create("http://192.168.30.130:9200") // 服务器IP + ES 端口));}@Testvoid testSuggest() throws IOException {// 1. 准备RequestSearchRequest request = new SearchRequest("hotel");// 2. 准备 DSLrequest.source().suggest(new SuggestBuilder().addSuggestion("suggestions",SuggestBuilders.completionSuggestion("suggestion").prefix("h").skipDuplicates(true).size(10)));// 3. 发送请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);// 4. 解析结果Suggest suggest = response.getSuggest();// 4.1 根据名称获取补全结果, suggest.getSuggestion 的name 属性是 DSL 语句定义的名称CompletionSuggestion suggestions = suggest.getSuggestion("suggestions");// 4.2 获取 optionsList<CompletionSuggestion.Entry.Option> options = suggestions.getOptions();// 4.3 遍历for (CompletionSuggestion.Entry.Option option : options) {String text = option.getText().toString();System.out.println(text);}}@AfterEachvoid tearDown() throws IOException {this.client.close();}}
三、数据同步
1 数据同步的三种方式
elasticsearch中的数据来自数据库,当数据库中数据发生变化时,elasticsearch 也必须跟着改变,这个就是es 和数据库之间的数据同步。
在微服务中,负责操作数据库的业务和操作es的业务可能在两个不同的微服务上,如何实现数据同步呢?
方案一:同步调用
操作完数据库,在调用搜索的微服务模块,由搜索的微服务模块更新es。
优点:实现简单
缺点:耦合度高,两个微服务模块之间需要调用;性能低,需要依次执行各步骤。
方式二:异步通知
数据更新到数据库后,发送一条消息,由搜索的微服务监听发送的消息,最后更新es。
优点:耦合度低
缺点:依赖于消息中间件,考验中间件的可靠性,如MQ。
方式三:监听binlog
每当操作数据库时,都会把对应的操作记录在binlog中,通过中间件canal监听binlog,当数据发生变化时,就会通知搜索模块的微服务更新es。
优点:完全解除微服务耦合
缺点:开启binlog增加数据库负担,实现复杂度高。
四、elasticsearch集群
1 概述
单机的es做数据存储,面临海量数据存储和单点故障问题。
- 海量数据存储问题:将索引库从逻辑上拆分为N个分片(shard),存储到多个节点.
- 单点故障问题:将分片数据在不同节点备份
2. 搭建集群
2.1 创建es集群
在单机上测试可以利用docker容器运行多个es实例来模拟es集群。但是在生产环境推荐每台服务器仅部署一个es的实例。部署es集群可以直接使用docker-compose来完成。但要求Linux虚拟机至少有4G的内存空间。
2.1 创建es集群,这里利用3个docker容器搭建3个es集群,编写一个docker-compose.yml文件。作用是一个文件部署多个es实例,达到一键启动的效果。
version: '2.2'
services:es01:image: elasticsearch:7.12.1 #镜像container_name: es01 #容器名称environment: #环境变量- node.name=es01 #节点名称,每个容器都有独一无二的名称- cluster.name=es-docker-cluster # 集群名称,es会把所有集群名称一致的节点自动组装成同一个集群中- discovery.seed_hosts=es02,es03 # 集群中另外两个节点的ip地址。这里用的docker容器,容器互联可以直接用集群名称- cluster.initial_master_nodes=es01,es02,es03 # 初始化的主节点。主节点是需要选举的,这里表示有哪些节点可以是候选选举的- "ES_JAVA_OPTS=-Xms512m -Xmx512m" #配置JVM堆内存的内存大小,最小内存和最大内存配置为512兆volumes: #数据卷挂载- data01:/usr/share/elasticsearch/dataports: # 端口映射- 9200:9200networks: # 加入一个名为 elastic 的网络中- elastices02:image: elasticsearch:7.12.1container_name: es02environment:- node.name=es02- cluster.name=es-docker-cluster- discovery.seed_hosts=es01,es03- cluster.initial_master_nodes=es01,es02,es03- "ES_JAVA_OPTS=-Xms512m -Xmx512m"volumes:- data02:/usr/share/elasticsearch/dataports:- 9201:9200networks:- elastices03:image: elasticsearch:7.12.1container_name: es03environment:- node.name=es03- cluster.name=es-docker-cluster- discovery.seed_hosts=es01,es02- cluster.initial_master_nodes=es01,es02,es03- "ES_JAVA_OPTS=-Xms512m -Xmx512m"volumes:- data03:/usr/share/elasticsearch/datanetworks:- elasticports:- 9202:9200
volumes:data01:driver: localdata02:driver: localdata03:driver: localnetworks:elastic:driver: bridge
2.2 修改 /etc/sysctl.conf 文件。es运行需要修改一些linux系统权限
vim /etc/sysctl.conf
2.3 添加下面的内容
vm.max_map_count=262144
2.4 然后执行命令,让配置生效:
sysctl -p
2.5 通过docker-compose启动集群:
docker-compose up -d
注:如果显示 docker-compose: 未找到命令...,需要安装docker-compose
下载安装docker-compose
curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
curl -L "https://mirror.ghproxy.com/https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
修改目录权限
chmod +x /usr/local/bin/docker-compose
创建软链接
ln -s /usr/local/bin/docker-compose /usr/bin/docker-compose
查看版本
docker-compose --version
2.3 集群监控状态
kibana可以监控es集群,不过新版本需要依赖es的x-pack 功能,配置比较复杂。
这里推荐使用 cerebro 来监控es集群状态,官方网址:https://github.com/lmenezes/cerebro
解压好的目录如下:
进入bin目录,双击启动 cerbro.bat 脚本文件
输入服务器ip+cerebro端口,即可进入管理界面:如 http://192.168.30.130:9000/
在Node address 输入框输入 http://服务器ip+节点端口,点击Connect即可。如http://192.168.30.130:9201
绿色的条,代表集群处于健康状态
2.3 创建索引库
2.3.1 利用kibana的DevTools创建索引库
在DevTools中输入指令:
PUT /itcast
{"settings": {"number_of_shards": 3, // 分片数量,默认是1,不分片。这里分3片"number_of_replicas": 1 // 副本数量,给每个分片创建一个副本},"mappings": {"properties": {// mapping映射定义 ...}}
}
3. 集群职责及脑裂
elasticsearch中集群节点有不同的职责划分
master eligible节点的作用是什么?
- 1. 参与集群选主
- 2. 主节点可以管理集群状态、管理分片信息、处理创建和删除索引库的请求
data节点的作用是什么?
1. 数据的CRUD
coordinator节点的作用是什么?
1. 路由请求到其它节点
2. 合并查询到的结果,返回给用户
默认情况下,每个节点都是master eligible节点,因此一旦master节点宕机,其它候选节点会选举一个成为主节点。当主节点与其他节点网络故障时,可能发生脑裂问题。
脑裂:一个集群出现了两个节点。
选取主节点:floor(N / 2) + 1 , N为节点数量,即节点数量除二加一的值向下取整。
discovery.zen.minimum_master_nodes
:这是一个非常重要的配置参数,用于防止脑裂问题。它指定了在选举过程中需要的最少主节点候选者数量。推荐设置为 (master_eligible_nodes / 2) + 1
,其中 master_eligible_nodes
是候选主节点的数量。
node.master
:此设置决定了一个节点是否有资格参与主节点选举。设置为 true
的节点可以成为主节点;设置为 false
的节点不能成为主节点。
3. 分布式新增和查询流程
当新增文档时,应该保存到不同分片,保证数据均衡,那么coordinating node如何确定数据该存储到哪个分片呢?
elasticsearch会通过hash算法来计算文档应该存储到哪个分片:
// 确定分片算法
shard = hash(_routing) % number_of_shards
说明:
_routing 默认是文档的 id
number_of_shards 是分片数量。算法与分片数量有关,因此索引库一旦创建,分片数量不能修改!
新增文档流程
查询流程
elasticsearch的查询分成两个阶段:
• scatter phase:分散阶段,coordinating node会把请求分发到每一个分片
• gather phase:聚集阶段,将查询结果汇总到coordinating node ,整理并返回给用户
4. 故障转移
集群中的主节点会监控集群中其他的节点,如果有节点发生了宕机,会把宕机的分片数据迁移到其他的节点上。如果是主节点发生宕机,会重新选取主节点,然后由新的主节点把宕机的分片迁移到其他节点上,确保数据安全,这就是故障转移。
如下图,假设node1是主节点宕机了,重新选取主节点为node2,。此时,node2会去查看宕机节点的分片,即 p-0 和 R - 1两个分片,把宕机节点的分片迁移到 node 2 或 node 3上。
in