ElasticSearch 高级使用
数据聚合
聚合(aggregations)可以实现对文档数据的统计、分析、运算。聚合常见的有三类
桶(Bucket)
桶是用来对文档做分组的,类似MySQL中的 Group By 的分组
相关聚合统计方式有多种,可以使查看以下链接:
https://www.elastic.co/guide/en/elasticsearch/reference/7.17/search-aggregations-bucket.html
TermAggregation
按照文档固定字段值分组
语法:
- GET /unsoft/_search
- {
- "size": 0, # 在检索时不显示任何一条文档
- "aggs": { # 做数据聚合
- "brandAgg": { # 起一个聚合的名字,会在后面查询数据时存在聚合数据分组名称
- "terms": { # 使用 TermAggregation 固定聚合
- "field": "brand", # 需要聚合的字段
- "size": 20, # 要分多少个组,默认只会给你返回10个分组
- "order": { # 可选 排序属性
- "_count": "asc" # 默认为降序排序,可以设置为分组类的升序排序
- }
- }
- }
- }
- }
ES 默认对整个索引库进行聚合操作,但如果你的一个索引库包含上千万上亿条数据时,做一次聚合会非常耗费性能的,这时我们可以增加query检索字段,在检索出来的结果基础上做聚合统计
- # Term 固定字段的检索结果聚合统计
- GET /unsoft/_search
- {
- "query": {
- "range": {
- "price": {
- "lte": 200, # 使用query先检索价格低于200的文档,再基于这个结果做聚合
- }
- }
- },
- "size": 0, # 查询出来的文档不进行显示,所以 hits 中不会有数据
-
- "aggs": { # 对检索出来的数据进行聚合统计
- "brandAgg": {
- "terms": {
- "field": "brand",
- "size": 20
- }
- }
- }
- }
DataHistogram
按照日期阶梯分组,例如一周为一组,或擤一月为一组
度量(Metric)
按照某种度量的方式进行统计,如最大值、最小值、平均值等
https://www.elastic.co/guide/en/elasticsearch/reference/7.17/search-aggregations-metrics.html
示例:
- # 巅套聚合
- GET /unsoft/_search
- {
- "query": {
- "range": {
- "price": {
- "lte": 200
- }
- }
- },
- "size": 0,
-
- "aggs": {
- "brandAgg": {
- "terms": {
- "field": "brand",
- "size": 20,
- "order": {
- "scoreAgg.avg": "asc" # 如果想使用度量聚合的值作为排序,使用 聚合名.值变量名来获取
- }
- },
- "aggs": { # 基于桶聚合嵌套度量聚合
- "scoreAgg": { # 用于存储统计的数据聚合名
- "stats": { # 度量聚合使用关键字 stats
- "field": "score",
- }
- }
- }
- }
- }
- }
管道(pipeline)
其它聚合的结果为基础再做聚合
https://www.elastic.co/guide/en/elasticsearch/reference/7.17/search-aggregations-pipeline.html
Java Client 实现聚合
- /**
- * 桶的固定聚合统计
- * terms 固定聚合统计可以分组字段,与Mysql 中的 Group by 类似
- * @return
- */
- @GetMapping("/bucket")
- public String bucket() throws IOException {
-
- SearchRequest searchRequest = new SearchRequest.Builder()
- // 声明查询的索引库
- .index("goods")
- // 创建索引
- .query(
- // 第一步:声明搜索索引条件查询文档
- query->query.range(
- rand->rand.field("price").lt(JsonData.of(200))
- )
- )
- // 第二步:设置聚合
- .aggregations("priceAgg",agg->agg.terms(
- // 使用固定关键词的聚合统计
- terms->terms
- .field("price")
- .size(20)
- .order(new NamedValue<>("priceStats.avg", SortOrder.Desc)) // 如果要使用度量聚合的数据作为排序,可使用 . 语法
- // 在固定关键词聚合的基础上嵌套度量聚合
- ).aggregations("priceStats",priceStats->priceStats.stats(
- // 使用价格作为计算度量聚合值
- s->s.field("price")
- ))
- )
- .build();
-
- SearchResponse<Goods> response = client.search(searchRequest, Goods.class);
- return response.toString();
- }
案例
把索引库中的 price价格,star星级,city城市 三个做聚合,并把聚合结果以 Map 方式输出。
我们先来看看 DSL 语句的构建:
- # 多聚合
- GET /goods/_search
- {
- "size": 0,
- "aggs": {
- "priceAgg": {
- "terms": {
- "field": "price",
- "size": 100
- }
- },
- "starAgg":{
- "terms": {
- "field": "star",
- "size": 100
- }
- },
- "cityAgg":{
- "terms": {
- "field": "city",
- "size": 100
- }
- }
- }
- }
-
使用JavaClient 代码如下:
- public class AggregationServiceImpl implements AggregationsService {
-
- // 获取 Java Client 客户端
- @Resource
- private ElasticsearchClient client;
-
- // 实现封装多聚合的方法
- @Override
- public Map<String, List<String>> filter() {
-
-
- /* 第一步:创建一个检索对象 */
- SearchRequest.Builder searchBuild = new SearchRequest.Builder()
- .size(0);
- // 对三个字段进行聚合
- addAgg(searchBuild, "price");
- addAgg(searchBuild, "star");
- addAgg(searchBuild, "city");
-
- SearchRequest searchRequest = searchBuild.build();
- SearchResponse<Void> response;
- /* 第二步:提交请求检索 */
- try {
- response = client.search(searchRequest, Void.class);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- /* 第三步:检索出的结果在response中获取 */
- Map<String, Aggregate> aggregations = response.aggregations();
- Map<String, List<String>> result = new HashMap<>();
-
- // 这里的 aggregations 包含了上面三个聚合的数据,我们通过遍历逐个取出
- aggregations.forEach((key, aggregation) -> {
-
- // term 固定字段聚合分为 Long 整型的聚合和 String 字符型的聚合,所以要先进行判断
- if (aggregation.isLterms()){
- Buckets<LongTermsBucket> buckets = aggregation.lterms().buckets();
- /* 注意:本身一个聚合的桶有多个聚合字段,所以也是一个数组,所以需要先获得这个桶的数组数据 */
- List<LongTermsBucket> array = buckets.array();
- ArrayList<String> list = new ArrayList<>();
- /* 再对数组中的每一个字段获取它的聚合结果 */
- for (LongTermsBucket longTermsBucket : array) {
- System.out.println(longTermsBucket.key());
- list.add(longTermsBucket.key());
- }
- result.put(key,list);
- }
-
- // 判断是否是字符型的聚合
- if (aggregation.isSterms()){
- System.out.println(aggregation.sterms().buckets());
- }
-
- });
-
- return result;
- }
-
- // 创建聚合的方法
- private SearchRequest.Builder addAgg(SearchRequest.Builder builder, String aggName) {
- builder.aggregations(
- aggName + "Agg",
- // 使用固定字段聚合的方式
- agg -> agg.terms(
- t -> t.field(aggName)
- .size(100)
- )
- );
- return builder;
- }
- }
自动补全
自动补全指的是当输入某些关键字时,系统会自动根据用户当前输入的基础数据进行相关关键词的联想补全。
拼音分词器
1.安装拼音分词器,根据不同版本的ES找对这版本的拼音分词器版本即可
https://github.com/medcl/elasticsearch-analysis-pinyin
2.把分词器解压到一个文件夹,并放到ES的plugin文件夹中,重启ES即可。
3.在使用词语分析时,要使用拼音分词器pinyin,即可
- GET /_analyze
- {
- "text": ["我们都去玩吧"],
- "analyzer": "pinyin"
- }
拼音分词器可进行自定义配置,详细配置可以在github说明中查看,以下是转载github的配置选项:
https://github.com/medcl/elasticsearch-analysis-pinyin
** 可选参数 **
keep_first_letter
开启这个选项, 例如:刘德华
>ldh
, 默认: truekeep_separate_first_letter
开启这个选项, 保留首字母的词条, eg:刘德华
>l
,d
,h
, 默认: false,注意:查询结果可能会因为词频过大而过于模糊limit_first_letter_length
设置第一个字母的最大长度, default: 16keep_full_pinyin
开启这个选项, eg:刘德华
> [liu
,de
,hua
], default: truekeep_joined_full_pinyin
开启这个选项, eg:刘德华
> [liudehua
], default: falsekeep_none_chinese
保留非中文字母或数字,
default: truekeep_none_chinese_together
把非中文字母放在一起, default: true, eg:DJ音乐家
->DJ
,yin
,yue
,jia
, 如果设置为false
, eg:DJ音乐家
->D
,J
,yin
,yue
,jia
, NOTE:keep_none_chinese
要先开启keep_none_chinese_in_first_letter
将非中文字母放在首字母中, eg:刘德华AT2016
->ldhat2016
, default: truekeep_none_chinese_in_joined_full_pinyin
保持非中文字母为连体拼音, eg:刘德华2016
->liudehua2016
, default: falsenone_chinese_pinyin_tokenize
如果非汉语字母是拼音,则将其拆分成单独的拼音, default: true, eg:liudehuaalibaba13zhuanghan
->liu
,de
,hua
,a
,li
,ba
,ba
,13
,zhuang
,han
, 注意:keep_none_chinese
和keep_none_chinese_together
要先开启keep_original
开启这个选项, 除了把词条转为拼音外,还会保留原来的词条, default: falselowercase
小写非中文字母,, default: truetrim_whitespace
default: trueremove_duplicated_term
开启这个选项, 重复的词将被删除以保存索引, eg:de的
>de
, default: false, 注意:职位相关查询可能受影响ignore_pinyin_offset
6.0之后,偏移量是严格限制的,不允许有重叠标记,有了这个参数,重叠标记将允许忽略偏移量,请注意,所有位置相关的查询或高亮将变得不正确,您应该使用多个字段,并根据不同的查询目的指定不同的设置。如果需要偏移量,请将其设置为false. default: true.
拼音分词器的前期问题
- GET /_analyze
- {
- "text": ["你们好"],
- "analyzer": "pinyin"
- }
-
-
- 结果:
- {
- "tokens" : [
- {
- "token" : "ni",
- "start_offset" : 0,
- "end_offset" : 0,
- "type" : "word",
- "position" : 0
- },
- {
- "token" : "nmh",
- "start_offset" : 0,
- "end_offset" : 0,
- "type" : "word",
- "position" : 0
- },
- {
- "token" : "men",
- "start_offset" : 0,
- "end_offset" : 0,
- "type" : "word",
- "position" : 1
- },
- {
- "token" : "hao",
- "start_offset" : 0,
- "end_offset" : 0,
- "type" : "word",
- "position" : 2
- }
- ]
- }
以上图执行结果为例,当我们使用拼音分词器切割文本分词时,我们发现有以下几个问题:
1.被拼音分词器分词后,拼音分词器没有做分词功能,而是把每一个中文字都转为拼音了
2.被拼音分词器分词后,中文分词被去除了
因此我们应在配置拼音分词时,对不同的情况配置是否使用拼音分词。
请看下一节《分词概念》
分词概念
ES 的分词器的组成中,包含三个组件:
character filters:对文本中的特殊字符进行文本化,如 🙂 这样的表情可以转为“开心”等的操作
tokenizer:将文本按一定的规则切割成词条。即 ik 分词主要的工作流程,如果是 keyword 则不进行分词
tokenizer filter:将 tokenizer 输出的词条做进一步处理,例如大小写转换、同义词处理、拼音转化等,拼音分词就在这里处理的,为分词器的最后一步。
以下图为说明分词三大组件的概念:
自定义分词器
通过上一节的说明,我们知道分词器是由三个组件组成的,那么我们就可以利用这三个组件在不同的运行阶段定义不同的分词器,如tokenizer阶段我们使用ik分词器,而tokenizer filter阶段我们则使用拼音分词器。
具体配置如下:
- PUT /test
- {
- "settings": { // 在创建索引库时可定义用于这个索引库的配置
- "analysis": { // 分析配置
- "analyzer": { // 在分词配置上我们做自定义分词器
- "my_analyzer": { // 分词器名称
- "tokenizer": "ik_max_word", // 在tokenizer阶段我们使用ik进行分词
- "filter": "pinyin" // 在tokenizer阶段之后,我们把分好的词做拼音转化
- }
- }
- }
- }
- }
-
这样的配置依然存在问题,因为拼音分词器并没有做自定义配置,这使得 tokenizer filter 阶段使用拼音分词器后,拼音分词器会把原有的中文分词给删掉(这是拼音分词器的默认配置引起的)
所以我们要进一步对拼音分词器做自定义配置:
- PUT /test
- {
- "settings": {
- "analysis": {
- "analyzer": { // 自定义分词器
- "my_analyzer": { // 分词器名称
- "tokenizer": "ik_max_word",
- "filter": "py" // 这里引用下面自定义配置的分词器
- }
- },
- // 可以对 tokenizer filter 阶段做详细的配置
- "filter": { // 自定义tokenizer filter
- "py": { // 过滤器名称
- "type": "pinyin", // 过滤器类型,所应用的分词器为 pintin 分词器
- "keep_full_pinyin": false, // 关闭不能再分割的词,还拆分转为拼音功能
- "keep_joined_full_pinyin": true, // 开启不能再分割的词的拼音转换
- "keep_original": true, // 保留ik分词后的中文词
- "limit_first_letter_length": 16, // 设置第一个字母的最大长度
- "remove_duplicated_term": true, // 删除重复词
- "none_chinese_pinyin_tokenize": false // 不对全拼音的分割处理 如 liudehua -> [liu,de,hua]
- }
- }
- }
- }
- }
-
通过以上的配置后,拼音分词器做到主要功能包括:不删除原有的中文词,不对不能分割的词做拼音分割。
为字段配置拼音分词器
在以往我们配置 ik 分词时,是只对某个索引库中的某个字段进行配置的,因此我们如果想要使用自定义配置好的分词器,我们需要在 settings 中先定义好,然后在下面的字段中声明使用该分词器即可:
- PUT /test
- {
- "settings": {
- "analysis": {
- "analyzer": {
- "My_analyzer":{
- "tokenizer":"ik_max_word",
- "filter":"py"
- }
- },
- "filter": {
- "py": {
- "type": "pinyin",
- "keep_full_pinyin":false,
- "keep_joined_full_pinyin":true,
- "keep_original":true,
- "limit_first_letter_length":16,
- "remove_duplicated_term":true,
- "none_chinese_pinyin_tokenize":false
- }
- }
- }
- },
- "mappings": {
- "properties": {
- "name":{
- "type": "text",
- "analyzer": "My_analyzer"
- }
- }
- }
- }
存在问题
当我们配置好分词器后,我们依然存在一个问题,看一下代码
- // 加入第一个文档
- PUT /test/_doc/1
- {
- "name":"狮子"
- }
-
- // 加入第二个文档
- PUT /test/_doc/2
- {
- "name":"世子"
- }
-
- // 检索
- GET /test/_search
- {
- "query": {
- "match": {
- "name": "狮子进了山洞"
- }
- }
- }
这时我们发现,检索的结果有问题:
- {
- "took" : 9,
- "timed_out" : false,
- "_shards" : {
- "total" : 1,
- "successful" : 1,
- "skipped" : 0,
- "failed" : 0
- },
- "hits" : {
- "total" : {
- "value" : 2,
- "relation" : "eq"
- },
- "max_score" : 0.33425623,
- "hits" : [
- {
- "_index" : "test",
- "_type" : "_doc",
- "_id" : "1",
- "_score" : 0.33425623,
- "_source" : {
- "name" : "狮子"
- }
- },
- {
- "_index" : "test",
- "_type" : "_doc",
- "_id" : "2",
- "_score" : 0.3085442,
- "_source" : {
- "name" : "世子"
- }
- }
- ]
- }
- }
查询文本中,并没有包含 shizi 或 世子 这些词,但为什么 世子 会被检索出来呢?
这是因为,ES在插入和检索过种都使用了自定义分词器,其检索分词如下图所示:
这是因为,检索的文本会先进行分词,再把分好的词进行查询操作,其中 "狮子" 会被拼音分词转化为 shizi 这一词,而 世子 的词条中就包含了 "shizi" ,所以引起的
要解决这个问题,我们应该配置ES在检索时,不对检索文本做拼音分词操作,在创建索引库时,单独设置字段的搜索分词器,具体如下:
- PUT /test
- {
- "settings": {
- "analysis": {
- "analyzer": {
- "My_analyzer":{
- "tokenizer":"ik_max_word",
- "filter":"py"
- }
- },
- "filter": {
- "py": {
- "type": "pinyin",
- "keep_full_pinyin":false,
- "keep_joined_full_pinyin":true,
- "keep_original":true,
- "limit_first_letter_length":16,
- "remove_duplicated_term":true,
- "none_chinese_pinyin_tokenize":false
- }
- }
- }
- },
- "mappings": {
- "properties": {
- "name":{
- "type": "text",
- "analyzer": "My_analyzer",
- "search_analyzer": "ik_max_word"
- }
- }
- }
- }
自动补全查询
我们之前对于文本类型的可以分为“text”和“keyword”两种,而自动补全是第三种文本类型“completion”,定义自动补全类型方式如下:
- PUT /comptest
- {
- "mappings":{
- "properties":{
- "title":{
- "type":"completion"
- }
- }
- }
- }
这种类型是一个文本数组,当我们需要往这个字段中写入数据时,可通过以下代码方式:
- POST /comptest/_doc
- {
- "title":["apple","iphone"]
- }
-
- POST /comptest/_doc
- {
- "title":["huawei","mtae60pro"]
- }
-
- POST /comptest/_doc
- {
- "title":["xiaomi","redmi60p"]
- }
自动补全查询
- GET /comptest/_search
- {
- "suggest": { // 使用补全查询
- "title_suggest": { // 自定一个自动补全的名称
- "text": "i", // 自动补充的开头文本
- "completion":{ // 使用自动补全类型
- "field":"title", // 要自动补全的字段,前提是该字段需要是completion类型
- "skip_duplicates":true, // 跳过重复的文档
- "size":10 // 每次只显示10条
- }
- }
- }
- }
数据同步
ES 中的数据来自mysql数据库,因此如果mysql数据发生改变时,ES也必须跟着改变,这个就是ES与mysql之间的数据同步
数据同步通常有三种方案,分别如下:
1.采用同步方案,即ES服务中增加REST接口,当数据发生增删改的时候,向ES服务调用相应的接口。
方案一的缺点非常明显,有以下两个缺点:
- 解耦度不高,admin服务代码中包含了调用ES服务的代码
- admin服务调用ES服务时还需要等待ES服务的响应,执行时间变长,效率不高
显然不推荐使用这个方案。
2.采用配合RabbitMQ队列方式,admin服务出现增删改的操作时,往RabbitMQ中发送任务队列,当ES服务收到队列任务时,去同步处理ES中的数据
缺点:虽然解决了解耦问题,但相对来说依然存在解耦的问题,但基本上足够使用。
3.采用mysql数据库开启 binlog,并使用 canal 中间件监听 binlog 通知ES服务进行同步更新,使用 binlog 可以彻底解决解耦问题
缺点:binlog 是来自Mysql数据库服务的功能,开启binlog会加重mysql的性能占用,但这种方法是最好的。
使用RabbitMQ方案做数据同步
因为canal我们还没有接触,所以使用 RabbitMQ 做同步ES的解决方案。
步骤:
1.我们要首先了解,数据库在做增删改时我们需要向ES提交同步任务,其次,ES索引库的增和改其实是相同的含义,所以我们可以把增和改作为一个任务。
2.定义好RabbitMQ的处理过程
在RabbitMQ中我们需要
- 创建一个交换
- 创建一个用于处理增和改的队列,并绑定到交换机中
- 创寻一个用于删除的队列,并绑定到交换机中
- 在ES服务中创建一个消费者,用于处理增和改的任务
- 在ES服务中创建一个消费者,用于处理删除的任务
- 当admin服务发生增删改操作时,向RabbitMQ中发送对应的任务到相应的队列中。
具体声明代码如下:
- @Configuration
- public class RabbitConfig {
-
- /**
- * 创建一个交换机
- * @return
- */
- @Bean
- public TopicExchange topicExchange(){
- return new TopicExchange("hotel.topic");
- }
-
- /**
- * 创建一个用于处理增加和修改的队列
- */
- @Bean
- public Queue insertOrUpdateQueue(){
- return new Queue("hotel.insert.queue");
- }
-
- /**
- * 创寻一个用于删除的队列
- * @return
- */
- @Bean
- public Queue deleteQueue(){
- return new Queue("hotel.delete.queue");
- }
-
- /**
- * 绑定两个队列到交换机中
- */
- @Bean
- public Binding insertAndUpdateBinding(
- @Qualifier("topicExchange") TopicExchange exchange,
- @Qualifier("insertOrUpdateQueue") Queue queue
- ){
- return BindingBuilder.bind(queue).to(exchange).with("hotel.insert.key");
- }
-
- @Bean
- public Binding deleteBinding(
- @Qualifier("topicExchange") TopicExchange exchange,
- @Qualifier("deleteQueue") Queue queue
- ){
- return BindingBuilder.bind(queue).to(exchange).with("hotel.delete.key");
- }
- }
3.在ES服务中定义队列的处理方法:
- @Component
- public class RabbitHandler {
-
-
- @RabbitListener(queues = "hotel.insert.queue")
- public void insertHandler(Message message){
-
- }
-
- @RabbitListener(queues = "hotel.delete.queue")
- public void deleteHandler(Message message){
-
- }
- }
集群
单机的ElasticSearch做数据存储,必然面临两个问题,海量数据存储问题,和单点故障问题。
对于海量数据问题,将索引库从逻辑上拆分为N个分版(shard),存储到多个节点
对于单点故障问题,将分代数据在不同节点备份(replica)
ES集群搭建


共有 0 条评论