分布式搜索引擎(elasticsearch) 3/3章 您所在的位置:网站首页 公章的读音 分布式搜索引擎(elasticsearch) 3/3章

分布式搜索引擎(elasticsearch) 3/3章

2023-06-14 04:37| 来源: 网络整理| 查看: 265

目录

数据聚合:

DSL实现Bucket聚合:

RestAPI实现聚合(与上面DSL方法一样)

DSL实现度量(Metrics)和管道(pipeline)聚合:

RestAPI实现聚合(与上面DSL方法一样)

自动补全

拼音分词器

自定义分词器 (创建索引库时,通过settings来配置自定义的analyzer(分词器))

现在想根据词来分拼音

自定义分词器最终结果

因为拼音分词器适合在创建倒排索引的时候使用,但不能在搜索的时候使用

 语法:

自动补全查询

数据同步:

数据同步思路分析

方案一:同步调用

 方案二:异步通知

方案三:监听binlog

实现elasticsearch与数据库数据同步(异步通知)

引入依赖

配置mq地址

声明常量

声明exchange、queue、RoutingKey(有基于注解和基于bean)

在admin中的增、删、改业务中完成消息发送

在demo中完成消息监听,并更新elasticsearch中数据

es集群:

搭建ES集群(创建三台es服务成一个集群)

搭建es集群

集群状态监控

创建索引库(分片)

查看分片效果

集群节点角色

 集群脑裂问题

集群分布式存储

elasticsearch会通过hash算法来计算文档应该存储到哪个分片:

说明:

新增文档流程:

​编辑

集群分布式查询

集群故障转移

数据聚合:

聚合常见的有三类:

        1.桶(Bucket)聚合:用来对文档做分组                 TermAggregation:按照文档字段值分组                 Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组

        2.度量(Metric)聚合:用以计算一些值,比如:最大值、最小值、平均值等                     avg:求平均值                     max:求最大值                     min:求最小值                     stats:同时求max、min、avg、sum等

         3.管道(pipeline)聚合:其它聚合的结果为基础做聚合

注意:参与聚合的字段类型必须是 keyword 数值 日期 布尔

DSL实现Bucket聚合:

统计价格小于200的品牌(brand)并且按照统计的文档数量升序有几种

GET /hotel/_search {

  "query": { //条件查询     "range": {       "price": {         "lte": 200 // 只对200元以下的文档聚合       }     }   },         "size": 0,  // 设置size为0,结果中不包含文档,只包含聚合结果   "aggs": { // 定义聚合     "brandAgg": { //给聚合起个名字       "terms": { // 聚合的类型,按照品牌值聚合,所以选择term         "field": "brand", // 参与聚合的字段         "size": 20 // 希望获取的聚合结果数量

        "order": [  //聚合结果排序 按照_count降序 多条件排序 单条件 把中括号去掉即可

                { "_count": "asc"}, // 按照_count升序(聚合会统计Bucket内的文档数量,记为_count)

                { "_key": "desc"},         ],       }     }   } }

RestAPI实现聚合(与上面DSL方法一样) // 1.准备请求 SearchRequest request = new SearchRequest("hotel"); //条件 request.source().query(QueryBuilders.rangeQuery("price").lte(200));//只对200元以下的文档聚合 //排序条件 BucketOrder bucketOrder = BucketOrder.count(false); //这里的count方法中true表示升序排列,false代表降序排列 BucketOrder keyOrder = BucketOrder.key(false); List bucketOrders=new ArrayList(); bucketOrders.add(bucketOrder); bucketOrders.add(keyOrder); //聚合 request.source().size(0); // 设置size为0,结果中不包含文档,只包含聚合结果 request.source().aggregation( AggregationBuilders .terms("brandAgg") //聚合的类型,brandAgg聚合名字 .field("brand") //参与聚合的字段 .size(20) .order(bucketOrders)); //希望获取的聚合结果数量 // 3.发送请求 SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT); // 4.解析响应 .......

总结;

aggs代表聚合,与query同级,此时query的作用是?         限定聚合的的文档范围 聚合必须的三要素:         聚合名称         聚合类型         聚合字段 聚合可配置属性有:         size:指定聚合结果数量         order:指定聚合结果排序方式         field:指定聚合字段

DSL实现度量(Metrics)和管道(pipeline)聚合:

要求获取每个品牌的用户评分(score)的min、max、avg等值

GET /hotel/_search {   "size": 0,    "aggs": {//第一次聚合 根据品牌聚合     "brandAgg": {        "terms": {          "field": "brand",          "size": 20       },       "aggs": { // 是brandAgg聚合的子聚合,也就是分组后对每组分别计算(管道)         "score_stats": { // 聚合名称           "stats": { // 聚合类型,这里stats可以计算min、max、avg等(度量)             "field": "score" // 聚合字段,这里是score           }         }       }     }   } }

RestAPI实现聚合(与上面DSL方法一样) // 1.准备请求 SearchRequest request = new SearchRequest("hotel"); //条件 request.source().query(QueryBuilders.rangeQuery("price").lte(200));//只对200元以下的文档聚合 //度量(Metrics)和管道(pipeline)聚合 StatsAggregationBuilder score = AggregationBuilders.stats("score"); //聚合 request.source().size(0); // 设置size为0,结果中不包含文档,只包含聚合结果 request.source().aggregation( AggregationBuilders .terms("brandAgg") //聚合的类型,brandAgg聚合名字 .field("brand") //参与聚合的字段 .size(20) //希望获取的聚合结果数量 .subAggregation(score)); // 3.发送请求 SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT); // 4.解析响应 ....... 自动补全 拼音分词器

要实现根据字母做补全,就必须对文档按照拼音分词。在GitHub上恰好有elasticsearch的拼音分词插件  地址

安装方式与IK分词器一样,分三步:

1 解压

2 上传到虚拟机中,elasticsearch的plugin目录

3 重启elasticsearch 测试

测试

POST /_analyze {   "text": "如家酒店",   "analyzer": "pinyin" }

结果就是 每一个字都有拼音  不是词

自定义分词器 (创建索引库时,通过settings来配置自定义的analyzer(分词器)) 现在想根据词来分拼音

elasticsearch中分词器(analyzer)的组成包含三部分:

character filters:在tokenizer之前对文本进行处理。例如删除字符、替换字符 tokenizer:将文本按照一定的规则切割成词条(term)。例如keyword,就是不分词;还有ik_smart tokenizer filter:将tokenizer输出的词条做进一步处理。例如大小写转换、同义词处理、拼音处理等

自定义分词器最终结果

因为拼音分词器适合在创建倒排索引的时候使用,但不能在搜索的时候使用

创建倒排索引时:

搜索时,用户搜索“狮子”:

 语法:

PUT /test {   "settings": {     "analysis": {       "analyzer": { // 自定义分词器         "my_analyzer": {  // 分词器名称           "tokenizer": "ik_max_word",           "filter": "py"  //下面filter定义的py         }       },       "filter": { // 自定义tokenizer filter  配置         "py": { // 过滤器名称           "type": "pinyin", // 过滤器类型,这里是pinyin           "keep_full_pinyin": false, //不单个字分词           "keep_joined_full_pinyin": true, //全拼           "keep_original": true, //保留中文           "limit_first_letter_length": 16,           "remove_duplicated_term": true,           "none_chinese_pinyin_tokenize": false         }       }     }   },

"mappings": { //创建倒排索引时用my_analyzer分词器;字段在搜索时用ik_smart分词器     "properties": {       "name": {         "type": "text",         "analyzer": "my_analyzer",//创建         "search_analyzer": "ik_smart"  //搜索       }     }   } }

自动补全查询

elasticsearch提供了Completion Suggester查询来实现自动补全功能。这个查询会匹配以用户输入内容开头的词条并返回

为了提高补全查询的效率,对于文档中字段的类型有一些约束:

        参与补全查询的字段必须是completion类型

        字段的内容一般是用来补全的多个词条形成的数组

例子:

// 创建索引库

PUT test {   "mappings": {     "properties": {       "title":{         "type": "completion"       }     }   } }

// 示例数据 POST test/_doc {   "title": ["Sony", "WH-1000XM3"] } POST test/_doc {   "title": ["SK-II", "PITERA"] } POST test/_doc {   "title": ["Nintendo", "switch"] }

// 自动补全查询 GET /test/_search {   "suggest": {     "title_suggest": {//取名       "text": "s", // 关键字       "completion": {         "field": "title", // 补全查询的字段         "skip_duplicates": true, // 跳过重复的         "size": 10 // 获取前10条结果       }     }   } }

RestAPI自动补全查询(与上面DSL查询结果一样)

String key="s"; // 1.准备请求 SearchRequest request = new SearchRequest("test"); // 2.请求参数 request.source().suggest(new SuggestBuilder() .addSuggestion( "title_suggest",//取名 SuggestBuilders .completionSuggestion("title")//补全查询的字段 .size(10)  // 获取前10条结果 .skipDuplicates(true) // 跳过重复的 .prefix(key)//关键字 )); // 3.发出请求 SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT); // 4.解析....... 数据同步: 数据同步思路分析 方案一:同步调用

优点:实现简单,粗暴   

缺点:业务耦合度高

 方案二:异步通知

优点:低耦合,实现难度一般

缺点:依赖mq的可靠性

方案三:监听binlog

优点:完全解除服务间耦合

缺点:开启binlog增加数据库负担、实现复杂度高

实现elasticsearch与数据库数据同步(异步通知) 引入依赖 org.springframework.boot spring-boot-starter-amqp 配置mq地址 spring: rabbitmq: host: 192.168.150.101 port: 5672 username: admmin password: 123456 virtual-host: / #虚拟主机 声明常量 public class HotelMqConstants { public static final String EXCHANGE_NAME = "hotel.topic"; //交换机名称 public static final String INSERT_QUEUE_NAME = "hotel.insert.queue"; //新增队列 public static final String DELETE_QUEUE_NAME = "hotel.delete.queue"; //删除队列 public static final String INSERT_KEY = "hotel.insert"; //新增RoutingKey public static final String DELETE_KEY = "hotel.delete"; //删除RoutingKey } 声明exchange、queue、RoutingKey(有基于注解和基于bean)

基于注解(在demo方声明)

@Component public class HotelListener { @Autowired private IHotelService hotelService;//对es的增 删操作 @RabbitListener(bindings = @QueueBinding( value = @Queue(name = HotelMqConstants.INSERT_QUEUE_NAME), exchange = @Exchange(name = HotelMqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC), key = HotelMqConstants.INSERT_KEY )) public void listenHotelInsert(Long hotelId){ // 新增 hotelService.saveById(hotelId); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = HotelMqConstants.DELETE_QUEUE_NAME), exchange = @Exchange(name = HotelMqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC), key = HotelMqConstants.DELETE_KEY )) public void listenHotelDelete(Long hotelId){ // 删除 hotelService.deleteById(hotelId); } }

基于bean(在demo方声明)

@Configuration public class MqConfig { @Bean public TopicExchange topicExchange(){//定义交换机 return new TopicExchange(HotelMqConstants.EXCHANGE_NAME,true,false); //交换机名字 是否持久化 是否自动删除 } @Bean public Queue insertQueue(){//定义新增或修改队列 return new Queue(HotelMqConstants.INSERT_QUEUE_NAME,true);//队列名字 持久化 } @Bean public Queue deleteQueue(){//定义删除队列 return new Queue(HotelMqConstants.DELETE_QUEUE_NAME,true); } @Bean public Binding insertQueueBinding(){//定义绑定关系 return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(HotelMqConstants.INSERT_KEY); } @Bean public Binding deleteQueueBinding(){//定义绑定关系 return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(HotelMqConstants.DELETE_KEY); } } 在admin中的增、删、改业务中完成消息发送 @RestController @RequestMapping("hotel") public class HotelController { @Autowired private IHotelService hotelService; //对数据库的接口 @Autowired private RabbitTemplate rabbitTemplate; @PostMapping public void saveHotel(@RequestBody Hotel hotel){ // 新增酒店 hotelService.save(hotel); // 发送MQ消息 rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId()); } @PutMapping() public void updateById(@RequestBody Hotel hotel){ if (hotel.getId() == null) { throw new InvalidParameterException("id不能为空"); } hotelService.updateById(hotel); // 发送MQ消息 rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId()); } @DeleteMapping("/{id}") public void deleteById(@PathVariable("id") Long id) { hotelService.removeById(id); // 发送MQ消息 rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.DELETE_KEY, id); } } 在demo中完成消息监听,并更新elasticsearch中数据 注解方法 @Component public class HotelListener { @Autowired private IHotelService hotelService;//对es的增 删操作 @RabbitListener(bindings = @QueueBinding( value = @Queue(name = HotelMqConstants.INSERT_QUEUE_NAME), exchange = @Exchange(name = HotelMqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC), key = HotelMqConstants.INSERT_KEY )) public void listenHotelInsert(Long hotelId){ // 新增 hotelService.saveById(hotelId); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = HotelMqConstants.DELETE_QUEUE_NAME), exchange = @Exchange(name = HotelMqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC), key = HotelMqConstants.DELETE_KEY )) public void listenHotelDelete(Long hotelId){ // 删除 hotelService.deleteById(hotelId); } } //-------------------------------------------------------------- bean方式 @Component public class HotelListener { @Autowired private IHotelService hotelService;//对es的增 删操作 @RabbitListener(queues=HotelMqConstants.INSERT_QUEUE_NAME) public void listenHotelInsert(Long hotelId){ // 新增 hotelService.saveById(hotelId); } @RabbitListener(queues=HotelMqConstants.DELETE_QUEUE_NAME) public void listenHotelDelete(Long hotelId){ // 删除 hotelService.deleteById(hotelId); } } //-------------------------------------------------------------- @Slf4j @Service public class HotelService extends ServiceImpl implements IHotelService { @Autowired private RestHighLevelClient restHighLevelClient; @Override public void deleteById(Long hotelId) { try { // 1.创建request DeleteRequest request = new DeleteRequest("hotel", hotelId.toString()); // 2.发送请求 restHighLevelClient.delete(request, RequestOptions.DEFAULT); } catch (IOException e) { throw new RuntimeException("删除酒店数据失败", e); } } @Override public void saveById(Long hotelId) { try { // 查询酒店数据,应该基于Feign远程调用hotel-admin,根据id查询酒店数据(现在直接去数据库查) Hotel hotel = getById(hotelId); // 转换 HotelDoc hotelDoc = new HotelDoc(hotel); // 1.创建Request IndexRequest request = new IndexRequest("hotel").id(hotelId.toString()); // 2.准备参数 request.source(JSON.toJSONString(hotelDoc), XContentType.JSON); // 3.发送请求 restHighLevelClient.index(request, RequestOptions.DEFAULT); } catch (IOException e) { throw new RuntimeException("新增酒店数据失败", e); } } } es集群: 搭建ES集群(创建三台es服务成一个集群)

部署es集群可以直接使用docker-compose来完成,但这要求你的Linux虚拟机至少有4G的内存空间

搭建es集群

首先编写一个docker-compose文件,内容如下:

version: '2.2' services:   es01:     image: elasticsearch:7.12.1  //镜像     container_name: es01    //容器名称     environment:        //环境变量       - node.name=es01    //节点名称       - cluster.name=es-docker-cluster //集群名称 集群名称一样 es就会自动组装成一个集群(其它节点必须一致)       - discovery.seed_hosts=es02,es03//另外两个节点的Ip地址(因为用的docker docker可以通过容器名互联)       - cluster.initial_master_nodes=es01,es02,es03 //初始化的主节点 主从 这三个节点里选举       - "ES_JAVA_OPTS=-Xms512m -Xmx512m"    //堆内存大小     volumes:    //数据卷       - data01:/usr/share/elasticsearch/data     ports:        //docker映射       - 9200:9200     networks:       - elastic   es02:     image: elasticsearch:7.12.1     container_name: es02     environment:       - node.name=es02       - cluster.name=es-docker-cluster       - discovery.seed_hosts=es01,es03       - cluster.initial_master_nodes=es01,es02,es03       - "ES_JAVA_OPTS=-Xms512m -Xmx512m"     volumes:       - data02:/usr/share/elasticsearch/data     ports:       - 9201:9200     networks:       - elastic   es03:     image: elasticsearch:7.12.1     container_name: es03     environment:       - node.name=es03       - cluster.name=es-docker-cluster       - discovery.seed_hosts=es01,es02       - cluster.initial_master_nodes=es01,es02,es03       - "ES_JAVA_OPTS=-Xms512m -Xmx512m"     volumes:       - data03:/usr/share/elasticsearch/data     networks:       - elastic     ports:       - 9202:9200 volumes:   data01:     driver: local   data02:     driver: local   data03:     driver: local

networks:   elastic:     driver: bridge

es运行需要修改一些linux系统权限,修改/etc/sysctl.conf文件

vi /etc/sysctl.conf

添加下面的内容:

vm.max_map_count=262144

然后执行命令,让配置生效:

sysctl -p

通过docker-compose启动集群:

docker-compose up -d

集群状态监控

使用cerebro来监控es集群状态,官方网址:GitHub - lmenezes/cerebro

解压即可使用,非常方便。

解压好的目录如下:

进入对应的bin目录: 

 双击其中的cerebro.bat文件即可启动服务。

访问http://localhost:9000 即可进入管理界面:

 输入你的elasticsearch的任意节点的地址和端口,点击connect即可:

绿色的条,代表集群处于绿色(健康状态)。 

创建索引库(分片)

注意:每个索引库的分片数量、副本数量都是在创建索引库时指定的,并且分片数量一旦设置以后无法修改。

利用kibana的DevTools创建索引库

1)在DevTools中输入指令:

PUT /itcast {   "settings": {     "number_of_shards": 3, // 分片数量     "number_of_replicas": 1 // 副本数量   },   "mappings": {     "properties": {       // mapping映射定义 ...     }   } }

2)或利用cerebro创建索引库

 填写索引库信息:

点击右下角的create按钮即可

查看分片效果

回到首页,即可查看索引库分片效果:

集群节点角色

节点类型

配置参数

默认值

节点职责

master eligible

node.master

true

备选主节点:主节点可以管理和记录集群状态、决定分片在哪个节点、处理创建和删除索引库的请求

data

node.data

true

数据节点:存储数据、搜索、聚合、CRUD

ingest

node.ingest

true

数据存储之前的预处理

coordinating

上面3个参数都为false则为coordinating节点

路由请求到其它节点

合并其它节点处理的结果,返回给用户

默认情况下节点同时具备这四种角色

elasticsearch中的每个节点角色都有自己不同的职责,因此建议集群部署时,每个节点都有独立的角色。(三台coordinating节点  五台data节点  三台备选主节点)

 集群脑裂问题

例:

  有A B C 三台es服务组成集群 A为主 当因为B C因为网络阻塞与A无法通信时 但A又能去客户端通信,这时B C就会在他们中选举一个主节点 这时集群里就有了两个主节点

解决方案:

  为了避免脑裂,需要要求选票超过 ( eligible节点数量 + 1 )/ 2 才能当选为主,因此eligible节点数量最好是奇数。对应配置项是discovery.zen.minimum_master_nodes,在es7.0以后,已经成为默认配置,因此一般不会发生脑裂问题

集群分布式存储 elasticsearch会通过hash算法来计算文档应该存储到哪个分片:

shard = hash(_routing) % number_of_shards

说明:

_routing默认是文档的id

算法与分片数量有关,因此索引库一旦创建,分片数量不能修改!

新增文档流程:

集群分布式查询

elasticsearch的查询分成两个阶段

        scatter phase:分散阶段,coordinating node会把请求分发到每一个分片

        gather phase:聚集阶段,coordinating node汇总data node的搜索结果,并处理为最终结                                   果集返回给用户

集群故障转移

集群的master节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其它节点,确保数据安全,这个叫做故障转移。

 当node1宕机后:星星代表主节点  红色代表宕机



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有