分布式搜索引擎(elasticsearch) 3/3章 | 您所在的位置:网站首页 › 公章的读音 › 分布式搜索引擎(elasticsearch) 3/3章 |
目录 数据聚合: DSL实现Bucket聚合: RestAPI实现聚合(与上面DSL方法一样) DSL实现度量(Metrics)和管道(pipeline)聚合: RestAPI实现聚合(与上面DSL方法一样) 自动补全 拼音分词器 自定义分词器 (创建索引库时,通过settings来配置自定义的analyzer(分词器)) 现在想根据词来分拼音 自定义分词器最终结果 因为拼音分词器适合在创建倒排索引的时候使用,但不能在搜索的时候使用 语法: 自动补全查询 数据同步: 数据同步思路分析 方案一:同步调用 方案二:异步通知 方案三:监听binlog 实现elasticsearch与数据库数据同步(异步通知) 引入依赖 配置mq地址 声明常量 声明exchange、queue、RoutingKey(有基于注解和基于bean) 在admin中的增、删、改业务中完成消息发送 在demo中完成消息监听,并更新elasticsearch中数据 es集群: 搭建ES集群(创建三台es服务成一个集群) 搭建es集群 集群状态监控 创建索引库(分片) 查看分片效果 集群节点角色 集群脑裂问题 集群分布式存储 elasticsearch会通过hash算法来计算文档应该存储到哪个分片: 说明: 新增文档流程: 编辑 集群分布式查询 集群故障转移 数据聚合:聚合常见的有三类: 1.桶(Bucket)聚合:用来对文档做分组 TermAggregation:按照文档字段值分组 Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组 2.度量(Metric)聚合:用以计算一些值,比如:最大值、最小值、平均值等 avg:求平均值 max:求最大值 min:求最小值 stats:同时求max、min、avg、sum等 3.管道(pipeline)聚合:其它聚合的结果为基础做聚合 注意:参与聚合的字段类型必须是 keyword 数值 日期 布尔 DSL实现Bucket聚合:统计价格小于200的品牌(brand)并且按照统计的文档数量升序有几种 GET /hotel/_search { "query": { //条件查询 "range": { "price": { "lte": 200 // 只对200元以下的文档聚合 } } }, "size": 0, // 设置size为0,结果中不包含文档,只包含聚合结果 "aggs": { // 定义聚合 "brandAgg": { //给聚合起个名字 "terms": { // 聚合的类型,按照品牌值聚合,所以选择term "field": "brand", // 参与聚合的字段 "size": 20 // 希望获取的聚合结果数量 "order": [ //聚合结果排序 按照_count降序 多条件排序 单条件 把中括号去掉即可 { "_count": "asc"}, // 按照_count升序(聚合会统计Bucket内的文档数量,记为_count) { "_key": "desc"}, ], } } } } RestAPI实现聚合(与上面DSL方法一样) // 1.准备请求 SearchRequest request = new SearchRequest("hotel"); //条件 request.source().query(QueryBuilders.rangeQuery("price").lte(200));//只对200元以下的文档聚合 //排序条件 BucketOrder bucketOrder = BucketOrder.count(false); //这里的count方法中true表示升序排列,false代表降序排列 BucketOrder keyOrder = BucketOrder.key(false); List bucketOrders=new ArrayList(); bucketOrders.add(bucketOrder); bucketOrders.add(keyOrder); //聚合 request.source().size(0); // 设置size为0,结果中不包含文档,只包含聚合结果 request.source().aggregation( AggregationBuilders .terms("brandAgg") //聚合的类型,brandAgg聚合名字 .field("brand") //参与聚合的字段 .size(20) .order(bucketOrders)); //希望获取的聚合结果数量 // 3.发送请求 SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT); // 4.解析响应 .......总结; aggs代表聚合,与query同级,此时query的作用是? 限定聚合的的文档范围 聚合必须的三要素: 聚合名称 聚合类型 聚合字段 聚合可配置属性有: size:指定聚合结果数量 order:指定聚合结果排序方式 field:指定聚合字段 DSL实现度量(Metrics)和管道(pipeline)聚合:要求获取每个品牌的用户评分(score)的min、max、avg等值 GET /hotel/_search { "size": 0, "aggs": {//第一次聚合 根据品牌聚合 "brandAgg": { "terms": { "field": "brand", "size": 20 }, "aggs": { // 是brandAgg聚合的子聚合,也就是分组后对每组分别计算(管道) "score_stats": { // 聚合名称 "stats": { // 聚合类型,这里stats可以计算min、max、avg等(度量) "field": "score" // 聚合字段,这里是score } } } } } } RestAPI实现聚合(与上面DSL方法一样) // 1.准备请求 SearchRequest request = new SearchRequest("hotel"); //条件 request.source().query(QueryBuilders.rangeQuery("price").lte(200));//只对200元以下的文档聚合 //度量(Metrics)和管道(pipeline)聚合 StatsAggregationBuilder score = AggregationBuilders.stats("score"); //聚合 request.source().size(0); // 设置size为0,结果中不包含文档,只包含聚合结果 request.source().aggregation( AggregationBuilders .terms("brandAgg") //聚合的类型,brandAgg聚合名字 .field("brand") //参与聚合的字段 .size(20) //希望获取的聚合结果数量 .subAggregation(score)); // 3.发送请求 SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT); // 4.解析响应 ....... 自动补全 拼音分词器要实现根据字母做补全,就必须对文档按照拼音分词。在GitHub上恰好有elasticsearch的拼音分词插件 地址 安装方式与IK分词器一样,分三步: 1 解压 2 上传到虚拟机中,elasticsearch的plugin目录 3 重启elasticsearch 测试 测试 POST /_analyze { "text": "如家酒店", "analyzer": "pinyin" } 结果就是 每一个字都有拼音 不是词 自定义分词器 (创建索引库时,通过settings来配置自定义的analyzer(分词器)) 现在想根据词来分拼音elasticsearch中分词器(analyzer)的组成包含三部分: character filters:在tokenizer之前对文本进行处理。例如删除字符、替换字符 tokenizer:将文本按照一定的规则切割成词条(term)。例如keyword,就是不分词;还有ik_smart tokenizer filter:将tokenizer输出的词条做进一步处理。例如大小写转换、同义词处理、拼音处理等 自定义分词器最终结果创建倒排索引时: 搜索时,用户搜索“狮子”: PUT /test { "settings": { "analysis": { "analyzer": { // 自定义分词器 "my_analyzer": { // 分词器名称 "tokenizer": "ik_max_word", "filter": "py" //下面filter定义的py } }, "filter": { // 自定义tokenizer filter 配置 "py": { // 过滤器名称 "type": "pinyin", // 过滤器类型,这里是pinyin "keep_full_pinyin": false, //不单个字分词 "keep_joined_full_pinyin": true, //全拼 "keep_original": true, //保留中文 "limit_first_letter_length": 16, "remove_duplicated_term": true, "none_chinese_pinyin_tokenize": false } } } }, "mappings": { //创建倒排索引时用my_analyzer分词器;字段在搜索时用ik_smart分词器 "properties": { "name": { "type": "text", "analyzer": "my_analyzer",//创建 "search_analyzer": "ik_smart" //搜索 } } } } 自动补全查询elasticsearch提供了Completion Suggester查询来实现自动补全功能。这个查询会匹配以用户输入内容开头的词条并返回 为了提高补全查询的效率,对于文档中字段的类型有一些约束: 参与补全查询的字段必须是completion类型 字段的内容一般是用来补全的多个词条形成的数组 例子: // 创建索引库 PUT test { "mappings": { "properties": { "title":{ "type": "completion" } } } } // 示例数据 POST test/_doc { "title": ["Sony", "WH-1000XM3"] } POST test/_doc { "title": ["SK-II", "PITERA"] } POST test/_doc { "title": ["Nintendo", "switch"] } // 自动补全查询 GET /test/_search { "suggest": { "title_suggest": {//取名 "text": "s", // 关键字 "completion": { "field": "title", // 补全查询的字段 "skip_duplicates": true, // 跳过重复的 "size": 10 // 获取前10条结果 } } } } RestAPI自动补全查询(与上面DSL查询结果一样) String key="s"; // 1.准备请求 SearchRequest request = new SearchRequest("test"); // 2.请求参数 request.source().suggest(new SuggestBuilder() .addSuggestion( "title_suggest",//取名 SuggestBuilders .completionSuggestion("title")//补全查询的字段 .size(10) // 获取前10条结果 .skipDuplicates(true) // 跳过重复的 .prefix(key)//关键字 )); // 3.发出请求 SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT); // 4.解析....... 数据同步: 数据同步思路分析 方案一:同步调用优点:实现简单,粗暴 缺点:业务耦合度高 优点:低耦合,实现难度一般 缺点:依赖mq的可靠性 优点:完全解除服务间耦合 缺点:开启binlog增加数据库负担、实现复杂度高 基于注解(在demo方声明) @Component public class HotelListener { @Autowired private IHotelService hotelService;//对es的增 删操作 @RabbitListener(bindings = @QueueBinding( value = @Queue(name = HotelMqConstants.INSERT_QUEUE_NAME), exchange = @Exchange(name = HotelMqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC), key = HotelMqConstants.INSERT_KEY )) public void listenHotelInsert(Long hotelId){ // 新增 hotelService.saveById(hotelId); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = HotelMqConstants.DELETE_QUEUE_NAME), exchange = @Exchange(name = HotelMqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC), key = HotelMqConstants.DELETE_KEY )) public void listenHotelDelete(Long hotelId){ // 删除 hotelService.deleteById(hotelId); } }基于bean(在demo方声明) @Configuration public class MqConfig { @Bean public TopicExchange topicExchange(){//定义交换机 return new TopicExchange(HotelMqConstants.EXCHANGE_NAME,true,false); //交换机名字 是否持久化 是否自动删除 } @Bean public Queue insertQueue(){//定义新增或修改队列 return new Queue(HotelMqConstants.INSERT_QUEUE_NAME,true);//队列名字 持久化 } @Bean public Queue deleteQueue(){//定义删除队列 return new Queue(HotelMqConstants.DELETE_QUEUE_NAME,true); } @Bean public Binding insertQueueBinding(){//定义绑定关系 return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(HotelMqConstants.INSERT_KEY); } @Bean public Binding deleteQueueBinding(){//定义绑定关系 return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(HotelMqConstants.DELETE_KEY); } } 在admin中的增、删、改业务中完成消息发送 @RestController @RequestMapping("hotel") public class HotelController { @Autowired private IHotelService hotelService; //对数据库的接口 @Autowired private RabbitTemplate rabbitTemplate; @PostMapping public void saveHotel(@RequestBody Hotel hotel){ // 新增酒店 hotelService.save(hotel); // 发送MQ消息 rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId()); } @PutMapping() public void updateById(@RequestBody Hotel hotel){ if (hotel.getId() == null) { throw new InvalidParameterException("id不能为空"); } hotelService.updateById(hotel); // 发送MQ消息 rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId()); } @DeleteMapping("/{id}") public void deleteById(@PathVariable("id") Long id) { hotelService.removeById(id); // 发送MQ消息 rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.DELETE_KEY, id); } } 在demo中完成消息监听,并更新elasticsearch中数据 注解方法 @Component public class HotelListener { @Autowired private IHotelService hotelService;//对es的增 删操作 @RabbitListener(bindings = @QueueBinding( value = @Queue(name = HotelMqConstants.INSERT_QUEUE_NAME), exchange = @Exchange(name = HotelMqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC), key = HotelMqConstants.INSERT_KEY )) public void listenHotelInsert(Long hotelId){ // 新增 hotelService.saveById(hotelId); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = HotelMqConstants.DELETE_QUEUE_NAME), exchange = @Exchange(name = HotelMqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC), key = HotelMqConstants.DELETE_KEY )) public void listenHotelDelete(Long hotelId){ // 删除 hotelService.deleteById(hotelId); } } //-------------------------------------------------------------- bean方式 @Component public class HotelListener { @Autowired private IHotelService hotelService;//对es的增 删操作 @RabbitListener(queues=HotelMqConstants.INSERT_QUEUE_NAME) public void listenHotelInsert(Long hotelId){ // 新增 hotelService.saveById(hotelId); } @RabbitListener(queues=HotelMqConstants.DELETE_QUEUE_NAME) public void listenHotelDelete(Long hotelId){ // 删除 hotelService.deleteById(hotelId); } } //-------------------------------------------------------------- @Slf4j @Service public class HotelService extends ServiceImpl implements IHotelService { @Autowired private RestHighLevelClient restHighLevelClient; @Override public void deleteById(Long hotelId) { try { // 1.创建request DeleteRequest request = new DeleteRequest("hotel", hotelId.toString()); // 2.发送请求 restHighLevelClient.delete(request, RequestOptions.DEFAULT); } catch (IOException e) { throw new RuntimeException("删除酒店数据失败", e); } } @Override public void saveById(Long hotelId) { try { // 查询酒店数据,应该基于Feign远程调用hotel-admin,根据id查询酒店数据(现在直接去数据库查) Hotel hotel = getById(hotelId); // 转换 HotelDoc hotelDoc = new HotelDoc(hotel); // 1.创建Request IndexRequest request = new IndexRequest("hotel").id(hotelId.toString()); // 2.准备参数 request.source(JSON.toJSONString(hotelDoc), XContentType.JSON); // 3.发送请求 restHighLevelClient.index(request, RequestOptions.DEFAULT); } catch (IOException e) { throw new RuntimeException("新增酒店数据失败", e); } } } es集群: 搭建ES集群(创建三台es服务成一个集群)部署es集群可以直接使用docker-compose来完成,但这要求你的Linux虚拟机至少有4G的内存空间 搭建es集群首先编写一个docker-compose文件,内容如下: version: '2.2' services: es01: image: elasticsearch:7.12.1 //镜像 container_name: es01 //容器名称 environment: //环境变量 - node.name=es01 //节点名称 - cluster.name=es-docker-cluster //集群名称 集群名称一样 es就会自动组装成一个集群(其它节点必须一致) - discovery.seed_hosts=es02,es03//另外两个节点的Ip地址(因为用的docker docker可以通过容器名互联) - cluster.initial_master_nodes=es01,es02,es03 //初始化的主节点 主从 这三个节点里选举 - "ES_JAVA_OPTS=-Xms512m -Xmx512m" //堆内存大小 volumes: //数据卷 - data01:/usr/share/elasticsearch/data ports: //docker映射 - 9200:9200 networks: - elastic es02: image: elasticsearch:7.12.1 container_name: es02 environment: - node.name=es02 - cluster.name=es-docker-cluster - discovery.seed_hosts=es01,es03 - cluster.initial_master_nodes=es01,es02,es03 - "ES_JAVA_OPTS=-Xms512m -Xmx512m" volumes: - data02:/usr/share/elasticsearch/data ports: - 9201:9200 networks: - elastic es03: image: elasticsearch:7.12.1 container_name: es03 environment: - node.name=es03 - cluster.name=es-docker-cluster - discovery.seed_hosts=es01,es02 - cluster.initial_master_nodes=es01,es02,es03 - "ES_JAVA_OPTS=-Xms512m -Xmx512m" volumes: - data03:/usr/share/elasticsearch/data networks: - elastic ports: - 9202:9200 volumes: data01: driver: local data02: driver: local data03: driver: local networks: elastic: driver: bridge es运行需要修改一些linux系统权限,修改/etc/sysctl.conf文件 vi /etc/sysctl.conf 添加下面的内容: vm.max_map_count=262144 然后执行命令,让配置生效: sysctl -p 通过docker-compose启动集群: docker-compose up -d 集群状态监控使用cerebro来监控es集群状态,官方网址:GitHub - lmenezes/cerebro 解压即可使用,非常方便。 解压好的目录如下: 进入对应的bin目录: 双击其中的cerebro.bat文件即可启动服务。 访问http://localhost:9000 即可进入管理界面: 输入你的elasticsearch的任意节点的地址和端口,点击connect即可: 绿色的条,代表集群处于绿色(健康状态)。 创建索引库(分片)注意:每个索引库的分片数量、副本数量都是在创建索引库时指定的,并且分片数量一旦设置以后无法修改。 利用kibana的DevTools创建索引库 1)在DevTools中输入指令: PUT /itcast { "settings": { "number_of_shards": 3, // 分片数量 "number_of_replicas": 1 // 副本数量 }, "mappings": { "properties": { // mapping映射定义 ... } } } 2)或利用cerebro创建索引库 填写索引库信息: 点击右下角的create按钮即可 查看分片效果回到首页,即可查看索引库分片效果: 节点类型 配置参数 默认值 节点职责 master eligible node.master true 备选主节点:主节点可以管理和记录集群状态、决定分片在哪个节点、处理创建和删除索引库的请求 data node.data true 数据节点:存储数据、搜索、聚合、CRUD ingest node.ingest true 数据存储之前的预处理 coordinating 上面3个参数都为false则为coordinating节点 无 路由请求到其它节点 合并其它节点处理的结果,返回给用户 默认情况下节点同时具备这四种角色 elasticsearch中的每个节点角色都有自己不同的职责,因此建议集群部署时,每个节点都有独立的角色。(三台coordinating节点 五台data节点 三台备选主节点) 例: 有A B C 三台es服务组成集群 A为主 当因为B C因为网络阻塞与A无法通信时 但A又能去客户端通信,这时B C就会在他们中选举一个主节点 这时集群里就有了两个主节点 解决方案: 为了避免脑裂,需要要求选票超过 ( eligible节点数量 + 1 )/ 2 才能当选为主,因此eligible节点数量最好是奇数。对应配置项是discovery.zen.minimum_master_nodes,在es7.0以后,已经成为默认配置,因此一般不会发生脑裂问题 集群分布式存储 elasticsearch会通过hash算法来计算文档应该存储到哪个分片:shard = hash(_routing) % number_of_shards 说明:_routing默认是文档的id 算法与分片数量有关,因此索引库一旦创建,分片数量不能修改! 新增文档流程:![]() elasticsearch的查询分成两个阶段 scatter phase:分散阶段,coordinating node会把请求分发到每一个分片 gather phase:聚集阶段,coordinating node汇总data node的搜索结果,并处理为最终结 果集返回给用户 集群的master节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其它节点,确保数据安全,这个叫做故障转移。 当node1宕机后:星星代表主节点 红色代表宕机 |
CopyRight 2018-2019 实验室设备网 版权所有 |