【SpringCloud——Elasticsearch(下)】

一、数据聚合

聚合,可以实现对文档数据的统计、分析、运算。常见的聚合有三类:

①、桶聚合:用来对文档做分组

  • TermAggregation:按照文档字段值分组。
  • Date Histogram:按照日期解题分组,例如一周为一组,或者一月为一组。

②、度量聚合:用以计算一些值,例如:最大值、最小值、平均值等

  • Avg:求平均
  • Max:求最大
  • Min:求最小
  • Stats:同时求最大、最小、平均、合计等

③、管道聚合:其他聚合的结果为基础做聚合

1、桶聚合

GET /hotel/_search
{"size": 0,  // 设置size为0,结果中不包含文档,只包含聚合结果"aggs": { // 定义聚合"brandAgg": { //给聚合起个名字"terms": { // 聚合的类型,按照品牌值聚合,所以选择term"field": "brand", // 参与聚合的字段"size": 20 // 希望获取的聚合结果数量}}}
}

①对聚合结果进行排序:

默认情况下,Bucket聚合会统计Bucket内的文档数量,记为_count,并且按照_count降序排序。

#聚合功能,自定义展示排序规则
GET /hotel/_search
{"size": 0,"aggs": {"brandAgg": {"terms": {"field": "brand","size": 20,"order": {"_count": "asc"}}}}
}

 ②、限定聚合范围

#聚合功能,限定聚合范围
GET /hotel/_search
{"size": 0,"aggs": {"brandAgg": {"terms": {"field": "brand","size": 20}}},"query": {"range": {"price": {"lte": 200}}}
}

 ③、基于RestAPI实现

    @Testvoid testAggregation() throws IOException {//1.准备SearchRequest request = new SearchRequest("hotel");//2.准备DSL//2.1、sizerequest.source().size(0);//2.2、聚合request.source().aggregation(AggregationBuilders.terms("brandAgg").field("brand").size(20));//3.发出结果SearchResponse response = client.search(request, RequestOptions.DEFAULT);//4.解析结果//4.1、解析聚合结果Aggregations aggregations = response.getAggregations();//4.2、很具名称获取聚合结果Terms brandAgg = aggregations.get("brandAgg");//4.3、获取桶List<? extends Terms.Bucket> buckets = brandAgg.getBuckets();//4.4、遍历for (Terms.Bucket bucket : buckets) {String brandName = bucket.getKeyAsString();long docCount = bucket.getDocCount();System.out.println(brandName + " " + docCount);}}

 2、度量聚合

①、求每个品牌的用户评分的最小值、最大值、平均值(聚合嵌套)

GET /hotel/_search
{"size": 0, "aggs": {"brandAgg": { "terms": { "field": "brand", "size": 20},"aggs": { // 是brands聚合的子聚合,也就是分组后对每组分别计算"score_stats": { // 聚合名称"stats": { // 聚合类型,这里stats可以计算min、max、avg等"field": "score" // 聚合字段,这里是score}}}}}
}

 ②、在①的基础上再对显示结果进行按平均值降序排列

#度量聚合_聚合的嵌套_metric
GET /hotel/_search
{"size": 0,"aggs": {"brandAgg": {"terms": {"field": "brand","size": 20,"order": {"scoreAgg.avg": "desc"}},"aggs": {"scoreAgg": {"stats": {"field": "score"}}}}}
}

 3、多条件聚合+带过滤条件的聚合

案例:

        在业务层定义方法,实现对品牌、城市、星级的聚合。搜索页面的品牌、城市等信息不应该是写死在页面的,而是通过聚合索引库中的酒店数据得来的。同时,例如当我们选择上海这个城市时,品牌和星级都是根据上海的酒店得来的,因此,我们需要对聚合的对象做限制,也就是我们说的加过滤条件。

    @Overridepublic Map<String, List<String>> filters(RequestParams params) {try {Map<String,List<String>> result = new HashMap<>();List<String> list = new ArrayList<>();list.add("brand");list.add("city");list.add("starName");for (String s : list) {//1.准备RequestSearchRequest request = new SearchRequest("hotel");//2.准备DSL//2.1、设置sizerequest.source().size(0);//2.2、聚合request.source().aggregation(AggregationBuilders.terms(s + "Agg").field(s).size(20));//2.3、查询信息BoolQueryBuilder boolQuery = buildBasicQuery(params);request.source().query(boolQuery);//3.发出请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);//4.解析结果List<String> res = storageResult(response,s);if (s.equals("city")){result.put("city",res);}else if (s.equals("brand")){result.put("brand",res);}else {result.put("starName",res);}}return result;}catch (Exception e){throw new RuntimeException(e);}}private List<String> storageResult(SearchResponse response,String aggName) {List<String> result = new ArrayList<>();//4.1、解析聚合结果Aggregations aggregations = response.getAggregations();//4.2、很具名称获取聚合结果Terms brandAgg = aggregations.get(aggName + "Agg");//4.3、获取桶List<? extends Terms.Bucket> buckets = brandAgg.getBuckets();//4.4、遍历for (Terms.Bucket bucket : buckets) {String value = bucket.getKeyAsString();result.add(value);}return result;}

二、拼音分词器以及自动补全查询

#自定义拼音分词器
PUT /test
{"settings": {"analysis": {"analyzer": { //自定义分词器"my_analyzer": { //分词器名称"tokenizer": "ik_max_word","filter": "py"}},"filter": { //自定义tokenizer filter"py": { //过滤器名称"type": "pinyin",//过滤器类型,这里是拼音"keep_full_pinyin": false,"keep_joined_full_pinyin": true,"keep_original": true,"limit_first_letter_length": 16,"remove_duplicated_term": true,"none_chinese_pinyin_tokenize": false}}}},"mappings": {"properties": {"name":{"type": "text","analyzer": "my_analyzer","search_analyzer": "ik_smart"}}}
}

 案例:酒店系统实现搜索自动补全功能

1、修改索引库数据结构

PUT /hotel
{"settings": {"analysis": {"analyzer": {"text_anlyzer": {"tokenizer": "ik_max_word","filter": "py"},"completion_analyzer": {"tokenizer": "keyword","filter": "py"}},"filter": {"py": {"type": "pinyin","keep_full_pinyin": false,"keep_joined_full_pinyin": true,"keep_original": true,"limit_first_letter_length": 16,"remove_duplicated_term": true,"none_chinese_pinyin_tokenize": false}}}},"mappings": {"properties": {"id":{"type": "keyword"},"name":{"type": "text","analyzer": "text_anlyzer","search_analyzer": "ik_smart","copy_to": "all"},"address":{"type": "keyword","index": false},"price":{"type": "integer"},"score":{"type": "integer"},"brand":{"type": "keyword","copy_to": "all"},"city":{"type": "keyword"},"starName":{"type": "keyword"},"business":{"type": "keyword","copy_to": "all"},"location":{"type": "geo_point"},"pic":{"type": "keyword","index": false},"all":{"type": "text","analyzer": "text_anlyzer","search_analyzer": "ik_smart"},"suggestion":{"type": "completion","analyzer": "completion_analyzer"}}}
}

2、导入数据到索引库

①、修改实体类

@Data
@NoArgsConstructor
public class HotelDoc {private Long id;private String name;private String address;private Integer price;private Integer score;private String brand;private String city;private String starName;private String business;private String location;private String pic;private Object distance;private Boolean isAD;private List<String> suggestion;public HotelDoc(Hotel hotel) {this.id = hotel.getId();this.name = hotel.getName();this.address = hotel.getAddress();this.price = hotel.getPrice();this.score = hotel.getScore();this.brand = hotel.getBrand();this.city = hotel.getCity();this.starName = hotel.getStarName();this.business = hotel.getBusiness();this.location = hotel.getLatitude() + ", " + hotel.getLongitude();this.pic = hotel.getPic();if (this.business.contains("、") || this.business.contains("/")){//business有多个值,需要切割String[] arr = new String[10];if (business.contains("、")){arr = this.business.split("、");}else {arr = this.business.split("/");}//添加元素this.suggestion = new ArrayList<>();this.suggestion.add(this.brand);Collections.addAll(this.suggestion,arr);}else {this.suggestion = Arrays.asList(this.brand,this.business);}}
}

②、导入

    //批量新增文档数据@Testvoid testBulkRequest() throws IOException {//1.创建RequestBulkRequest request = new BulkRequest();//2.准备Json文档//批量查询酒店数据List<Hotel> list =  hotelService.list();for (int i = 0; i < list.size(); i++) {HotelDoc hotelDoc = new HotelDoc(list.get(i));request.add(new IndexRequest("hotel").id(hotelDoc.getId().toString()).source(JSON.toJSONString(hotelDoc),XContentType.JSON));}//3.发送请求client.bulk(request,RequestOptions.DEFAULT);}

3、接口编写

①、controller

    @GetMapping("/suggestion")public List<String> getSuggestion(@RequestParam("key") String prefix){return hotelService.getSuggestion(prefix);}

②、service

    @Overridepublic List<String> getSuggestion(String prefix) {try {//1、准备RequestSearchRequest request = new SearchRequest("hotel");//2、准备DSLrequest.source().suggest(new SuggestBuilder().addSuggestion("mySuggestions",SuggestBuilders.completionSuggestion("suggestion").prefix(prefix).skipDuplicates(true).size(10)));//3、发起请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);//4、解析结果Suggest suggest = response.getSuggest();//4.1、根据名称获取补全结果CompletionSuggestion suggestion = suggest.getSuggestion("mySuggestions");//4.2、获取options并遍历List<String> result = new ArrayList<>();for (CompletionSuggestion.Entry.Option option : suggestion.getOptions()) {//4.3、获取一个option中的text,也就是补全的此条String text = option.getText().string();result.add(text);}return result;} catch (Exception e) {throw new RuntimeException(e);}}

三、Es数据与MySQL数据同步

一旦用户对数据库的数据进行了增删改操作后,Es将如何感知到数据发生了变化并进行同步呢?我们有如下几种方案可供我们解决上述问题:

  • 异步通知,业务层修改时调用MQ将修改信息发送至队列当中,由ES同步端获取该信息并将更新的数据同步到索引库中。
  • 同步更新,业务层修改完成后调用ES同步端的接口,将修改信息发送给该接口,由该接口执行数据同步操作。
  • 开启MySQL的binlog并进行监听,一旦数据库当中的数据发生变化,ES同步端则通过监听binlog得知变更信息,之后ES进行数据同步。

针对以上三种,其优缺点如下:

        

 此处我们采用异步通知的方式进行数据同步:

1、声明交换机、队列、routing_key

public class MqConstants {/*** 交换机*/public static final String EXCHANGE_NAME = "hotel.topic";/*** 监听数据新增队列(包括新增和修改)*/public static final String INSERT_QUEUE_NAME = "hotel.insert.queue";/*** 新增队列routing_key*/public static final String INSERT_ROUTING_KEY = "hotel.insert";/*** 监听数据删除队列*/public static final String DELETE_QUEUE_NAME = "hotel.delete.queue";/*** 删除队列routing_key*/public static final String DELETE_ROUTING_KEY = "hotel_delete";}
@Configuration
public class MqConfig {//声明交换机@Bean("exchange")public TopicExchange hotelExchange(){return new TopicExchange(EXCHANGE_NAME);}//声明新增队列@Bean("insertQueue")public Queue insertQueue(){return QueueBuilder.durable(INSERT_QUEUE_NAME).build();}//声明删除队列@Bean("deleteQueue")public Queue deleteQueue(){return QueueBuilder.durable(DELETE_QUEUE_NAME).build();}//新增队列和交换机绑定@Beanpublic Binding insertQueueBinding(@Qualifier("exchange") TopicExchange topicExchange,@Qualifier("insertQueue") Queue queue){return BindingBuilder.bind(queue).to(topicExchange).with(INSERT_ROUTING_KEY);}//删除队列和交换机绑定@Beanpublic Binding deleteQueueBinding(@Qualifier("exchange") TopicExchange topicExchange,@Qualifier("deleteQueue") Queue queue){return BindingBuilder.bind(queue).to(topicExchange).with(DELETE_ROUTING_KEY);}
}

2、处理变更消息发送

    @PostMappingpublic void saveHotel(@RequestBody Hotel hotel){Long id = hotel.getId();if (id == null){buildHotelId.init();id = Long.valueOf(buildHotelId.hotelId);hotel.setId(id);}hotelService.save(hotel);rabbitTemplate.convertAndSend(MqConstants.EXCHANGE_NAME,MqConstants.INSERT_ROUTING_KEY,hotel.getId());}@PutMapping()public void updateById(@RequestBody Hotel hotel){if (hotel.getId() == null) {throw new InvalidParameterException("id不能为空");}hotelService.updateById(hotel);rabbitTemplate.convertAndSend(MqConstants.EXCHANGE_NAME,MqConstants.INSERT_ROUTING_KEY,hotel.getId());}@DeleteMapping("/{id}")public void deleteById(@PathVariable("id") Long id) {hotelService.removeById(id);rabbitTemplate.convertAndSend(MqConstants.EXCHANGE_NAME,MqConstants.DELETE_ROUTING_KEY,id);}

3、消息监听+ES数据同步

@Component
public class HotelListener {@Autowiredprivate IHotelService hotelService;/*** 监听新增或修改信息* @param id*/@RabbitListener(queues = MqConstants.INSERT_QUEUE_NAME)public void insertData(Long id){hotelService.insertById(id);}/*** 监听删除信息* @param id*/@RabbitListener(queues = MqConstants.DELETE_QUEUE_NAME)public void deleteData(Long id){hotelService.deleteById(id);}
}
    @Overridepublic void insertById(Long id) {try {Hotel hotel = getById(id);HotelDoc doc = new HotelDoc(hotel);IndexRequest request = new IndexRequest("hotel").id(doc.getId().toString());request.source(JSON.toJSONString(doc), XContentType.JSON);client.index(request,RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic void deleteById(Long id) {try {DeleteRequest request = new DeleteRequest("hotel", String.valueOf(id));client.delete(request,RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException(e);}}

四、ES集群部署

0、集群结构介绍

1、集群搭建

①、创建docker-compose.yml文件

version: '2.2'
services:es01:image: elasticsearch:7.12.1container_name: es01environment:- node.name=es01- cluster.name=es-docker-cluster- discovery.seed_hosts=es02,es03- cluster.initial_master_nodes=es01,es02,es03- "ES_JAVA_OPTS=-Xms512m -Xmx512m"volumes:- data01:/usr/share/elasticsearch/dataports:- 9200:9200networks:- elastices02:image: elasticsearch:7.12.1container_name: es02environment:- node.name=es02- cluster.name=es-docker-cluster- discovery.seed_hosts=es01,es03- cluster.initial_master_nodes=es01,es02,es03- "ES_JAVA_OPTS=-Xms512m -Xmx512m"volumes:- data02:/usr/share/elasticsearch/dataports:- 9201:9200networks:- elastices03:image: elasticsearch:7.12.1container_name: es03environment:- node.name=es03- cluster.name=es-docker-cluster- discovery.seed_hosts=es01,es02- cluster.initial_master_nodes=es01,es02,es03- "ES_JAVA_OPTS=-Xms512m -Xmx512m"volumes:- data03:/usr/share/elasticsearch/datanetworks:- elasticports:- 9202:9200
volumes:data01:driver: localdata02:driver: localdata03:driver: localnetworks:elastic:driver: bridge

②、es运行需要修改一些linux系统权限,修改/etc/sysctl.conf文件

vi /etc/sysctl.conf

③、添加如下内容

vm.max_map_count=262144

④、执行命令,使配置生效

sysctl -p

⑤、通过docker-compose启动集群

docker-compose up -d

2、集群监控

①、启动监控程序

 

②、浏览器查看WEB界面

localhost:9000

3、集群职责及脑裂

①、集群职责

Ⅰ、master eligible节点的作用是什么?

  • 参与集群选主
  • 主节点可以管理集群状态、管理分片信息、处理创建和删除索引库的请求

Ⅱ、data节点的作用是什么?

  • 数据的CRUD     

Ⅲ、coordinator节点的作用是什么?

  • 路由请求到其它节点
  • 合并查询到的结果,返回给用户

②、脑裂

        默认情况下,每个节点都是master eligible节点,因此一旦master节点宕机,其它候选节点会选举一个成为主节点。当主节点与其他节点网络故障时,可能发生脑裂问题。 为了避免脑裂,需要要求选票超过 ( eligible节点数量 + 1 )/ 2 才能当选为主,因此eligible节点数量最好是奇数。对应配置项是discovery.zen.minimum_master_nodes,在es7.0以后,已经成为默认配置,因此一般不会发生脑裂问题。

4、分布式新增和查询

 新增流程:

 查询流程:

 

5、ES故障转移

假设有三个节点,node1、node2、node3,node1此时为主节点,node2和node3为备选节点,当node1节点发生故障宕机时,node2节点和node3节点就会进行主节点选举,选举出新的主节点,假设我们的node2节点当选了主节点,此时它就会去检查集群当中分片的状态,参考上图,我们可以看到分片一和分片二的主分片都在,备份分片2和备份分片0也都在,那就缺少了主分片0和备份分片1,此时主节点node2就会把node1节点当中的分片迁移到node2和node3上。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/595.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux--12--Kernel panic – not syncing: Attempted to kill init! Redhat6.2故障修复

1、Linux环境 Redhat6.2&#xff0c; Linux version 2.6.32-220.el6.i686 (mockbuildx86-003.build.bos.redhat.com) (gccversion 4,4.5 20110214 (Red Hat 4.4.5-6) (GCC) ) #1 SMP Wed Nov 9 08:02:18 EST 2011 2、故障现象 修改TCP相关文件&#xff0c;重启完Linux系统之…

LLM-Client一个轻量级的LLM集成工具

大型语言模型(llm)已经彻底改变了我们与文本交互的方式&#xff0c;OpenAI、Google、AI21、HuggingfaceHub、Anthropic和众多开源模型提供了不同的功能和优势。但是每个模型都有其独特的体系结构、api和兼容性需求&#xff0c;集成这些模型是一项耗时且具有挑战性的任务。 所以…

全网超详细的 SpringBoot 整合 Elasticsearch 实战教程

在pom.xml里加入如下依赖&#xff1a; <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-elasticsearch</artifactId></dependency> 非常重要&#xff1a;检查依赖版本是否与你…

使用 MATLAB 和 C/C++ 生成 GPS L1 C/A 伪随机噪声 (PRN) 代码

function CACode = GpsCaCodePRN(sv)NUM_CODES = 37; %reserving 37 satellitesSR_LEN = 20;CA_PERIOD = 1023

3-如何创建等比数列?【视频版】

目录 问题视频解答 问题 视频解答 点击观看&#xff1a; 3-如何创建等差数列&#xff1f;

利用SQL注入漏洞登录后台

所谓SQL注入&#xff0c;就是通过把SQL命令插入到Web表单递交或输入域名或页面请求的查询字符串&#xff0c;最终达到欺骗服务器执行恶意的SQL命令&#xff0c;比如先前的很多影视网站泄露VIP会员密码大多就是通过WEB表单递交查询字符暴出的&#xff0c;这类表单特别容易受到SQ…

(二)WPF - 应用程序

一、运行程序的过程&#xff1a; &#xff08;1&#xff09; Application 对象被构造出来。&#xff08;2&#xff09;Run方法被调用。&#xff08;3&#xff09;Application.Startup 事件被触发&#xff08;4&#xff09;用户代码构造一个或多个 Window 对象。&#xff08;5&…

Finalshell安全吗?Xshell怎么样?

文章目录 一、我的常用ssh连接工具二、Xshell2.1 下载&#xff1a;认准官网2.2 Xshell 配置2.3 Xftp和WinSCP 一、我的常用ssh连接工具 之前讲过&#xff1a; 【服务器】远程连接选SSH&#xff08;PUTTY、Finalshell、WinSCP&#xff09; 还是 远程桌面&#xff08;RDP、VNC、…

使用POI将excel文件导入到数据库

概要 随着时代变化&#xff0c;有的需求也会越来越多&#xff0c;例如&#xff1a;有的文件上千条数据需要导入数据库不可能手动一条条导入吧&#xff1f;太浪费时间了&#xff01;所以需要编写程序让程序来导入 整体架构流程 我这里使用的是springbootmybatisMVC的项目架构…

ElasticSearch

title: ElasticSearch author: zed 一、引言 1.1 海量数据 在海量数据中执行搜索功能时&#xff0c;如果使用MySQL&#xff0c;效率太低。 1.2 全文检索 在海量数据中执行全文搜索时&#xff0c;如果使用MySQL&#xff0c;效率太低。 1.3 高亮显示 将搜索关键字&#xff0c;以…

深度解读 Android 14 重要的 8 个新特性~

一年一度的 Android 升级永不缺席&#xff0c;今年的代号叫 Upside Down Cake&#xff0c;倒置蛋糕&#xff0c;简称 U&#xff0c;对外版本为 Android 14。 一般来说&#xff0c;升级任务分为 ROM 角度和 App 角度&#xff0c;前者比较关心系统内部实现的变化&#xff0c;后者…

Mac电脑硬件/软件运行状况查看工具

iStat Menus是一款系统监控和管理工具&#xff0c;旨在帮助Mac用户实时监控电脑的各项硬件和软件信息。它以直观和定制化的方式提供了丰富的系统状态指标&#xff0c;让用户能够全面了解和管理自己的Mac电脑。 iStat Menus提供了一系列的菜单栏指示项目&#xff0c;可以显示诸如…