微服务技术栈SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式(五):分布式搜索 ES-下

文章目录

  • 一、数据聚合
    • 1.1 聚合种类
    • 1.2 DSL实现聚合
    • 1.3 RestAPI实现聚合
    • 1.4 演示:多条件聚合
  • 二、自动补全
    • 2.1 拼音分词器
    • 2.2 自定义分词器
    • 2.3 DSL自动补全查询
    • 2.5 实现酒店搜索框自动补全
      • 2.5.1 修改酒店索引库数据结构
      • 2.5.2 RestAPI实现自动补全查询
      • 2.5.3 实战
  • 三、数据同步
    • 3.1 实现数据同步的方法
    • 3.2 使用消息队列MQ实现数据同步
      • 3.2.1 导入hotel-admin
      • 3.2.2 声明交换机、队列、routingkey
  • 四、集群
    • 4.1 搭建ES集群
    • 4.2 集群职责和脑裂问题
    • 4.3 集群故障转移
    • 4.4 集群分布式存储与查询


一、数据聚合

1.1 聚合种类

聚合(aggregations)可以实现对文档数据的统计、分析、运算。聚合常见的有三类:

  1. 桶(Bucket)聚合:用来对文档做分组
    TermAggregation:按照文档字段值分组
    Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组
  2. 度量(Metric)聚合:用以计算一些值,比如:最大值、最小值、平均值等
    Avg:求平均值
    Max:求最大值
    Min:求最小值
    Stats:同时求max、min、avg、sum等
  3. 管道(pipeline)聚合:其它聚合的结果为基础做聚合

注意:参与聚合的字段类型必须是:keyword、数值、日期、布尔,一定不能是可分词的类型。

1.2 DSL实现聚合

# 使用DSL实现聚合
# 1.bucket桶聚合 + 限定聚合范围
# 例:根据酒店品牌名做聚合(并且限定价格不高于200的),并按照结果的升序排序,显示前5个品牌
GET /hotel/_search
{"query": {"range": {"price": {"lte": 200}}},"size": 0, //设置size为0,结果中不包含文档,只包含聚合结果"aggs": {  // 定义聚合"brandAgg": { // 定义聚合名"terms": {  // 聚合类型,按照品牌名聚合,所以选择term"field": "brand", // 参与聚合字段"order": {"_count": "asc"  //指定排序规则 升序}, "size": 20 //希望获得聚合结果数}}}
}# 2.Metrics聚合
# 例:获得每个品牌的用户评分的min、max、avg,并且按照avg排序(降序)GET /hotel/_search
{"size": 0,"aggs": {"brandAgg": {"terms": {"field": "brand","size": 20,"order": {"score_stats.avg": "desc"}},"aggs": { //子聚合"score_stats": { //子聚合名"stats": {  //聚合类型,stats可以计算min、max、avg等"field": "score"  //聚合字段}}}}}
}

1.3 RestAPI实现聚合

在这里插入图片描述
在这里插入图片描述

    /*** 桶bucket聚合*/@Testvoid testAgg() throws IOException {// 1.准备请求SearchRequest request = new SearchRequest("hotel");// 2.请求参数// 2.1.sizerequest.source().size(0);// 2.2.聚合request.source().aggregation(AggregationBuilders.terms("brandAgg").field("brand").size(20));// 3.发出请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);// 4.解析结果Aggregations aggregations = response.getAggregations();// 4.1.根据聚合名称,获取聚合结果Terms brandAgg = aggregations.get("brandAgg");// 4.2.获取bucketsList<? extends Terms.Bucket> buckets = brandAgg.getBuckets();// 4.3.遍历for (Terms.Bucket bucket : buckets) {String brandName = bucket.getKeyAsString();System.out.println("brandName = " + brandName);long docCount = bucket.getDocCount();System.out.println("docCount = " + docCount);}}

1.4 演示:多条件聚合

在这里插入图片描述


@Slf4j
@Service
public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {@Autowiredprivate RestHighLevelClient restHighLevelClient;@Overridepublic Map<String, List<String>> filters() {try {// 1.准备请求SearchRequest request = new SearchRequest("hotel");// 2.请求参数// 2.1.sizerequest.source().size(0);// 2.2.聚合buildAggregation(request);// 3.发出请求SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);// 4.解析结果Map<String, List<String>> result = new HashMap<>();Aggregations aggregations = response.getAggregations();// 4.1.根据品牌名称,获取聚合结果List<String> brandList = getAggByName(aggregations, "brandAgg");// 放入mapresult.put("品牌",brandList);// 4.2.根据城市名称,获取聚合结果List<String> cityList = getAggByName(aggregations, "cityAgg");// 放入mapresult.put("城市",cityList);// 4.3.根据星级名称,获取聚合结果List<String> starList = getAggByName(aggregations, "starAgg");// 放入mapresult.put("星级",starList);return result;} catch (IOException e) {throw new RuntimeException(e);}}public List<String> getAggByName(Aggregations aggregations, String aggName) {// 4.1.根据聚合名称,获取聚合结果Terms brandAgg = aggregations.get(aggName);// 4.2.获取bucketsList<? extends Terms.Bucket> buckets = brandAgg.getBuckets();// 4.3.遍历List<String> brandList = new ArrayList<>();for (Terms.Bucket bucket : buckets) {String key = bucket.getKeyAsString();brandList.add(key);}return brandList;}public void buildAggregation(SearchRequest request) {request.source().aggregation(AggregationBuilders.terms("brandAgg").field("brand").size(100));request.source().aggregation(AggregationBuilders.terms("cityAgg").field("city").size(100));request.source().aggregation(AggregationBuilders.terms("starAgg").field("starName").size(100));}
}

测试

@SpringBootTest
public class HotelDemoApplicationTest {@Autowiredprivate IHotelService hotelService;@Testvoid contextLoads(){Map<String, List<String>> filters = hotelService.filters();System.out.println(filters);}
}

结果:
在这里插入图片描述

二、自动补全

自动补全如下图所示:
在这里插入图片描述

2.1 拼音分词器

要实现根据字母做补全,就必须对文档按照拼音分词。在GitHub上恰好有elasticsearch的拼音分词插件。地址:https://github.com/medcl/elasticsearch-analysis-pinyin
安装方式与IK分词器一样,分三步:

  1. 解压
  2. 上传到虚拟机中,elasticsearch的plugin目录
  3. 重启elasticsearch
  4. 测试
    在这里插入图片描述

2.2 自定义分词器

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

演示:

# 自定义拼音分词器
PUT /test
{"settings": {"analysis": {"analyzer": { "my_analyzer": { "tokenizer": "ik_max_word","filter": "py"}},"filter": {"py": { "type": "pinyin","keep_full_pinyin": false,"keep_joined_full_pinyin": true,"keep_original": true,"limit_first_letter_length": 16,"remove_duplicated_term": true,"none_chinese_pinyin_tokenize": false}}}},"mappings": {"properties": {"name": {"type": "text","analyzer": "my_analyzer","search_analyzer": "ik_smart"}}}
}POST /test/_doc/1
{"id": 1,"name": "狮子"
}
POST /test/_doc/2
{"id": 2,"name": "虱子"
}GET /test/_search
{"query": {"match": {"name": "掉入狮子笼咋办"}}
}

在这里插入图片描述

注意:拼音分词器通常在创建索引库时使用,搜索时使用普通分词器即可

2.3 DSL自动补全查询

在这里插入图片描述
查询语法如下

// 自动补全查询
POST /test/_search
{"suggest": {"title_suggest": {  // 自定义补全查询名称"text": "s", // 关键字"completion": {"field": "title", // 补全字段"skip_duplicates": true, // 跳过重复的"size": 10 // 获取前10条结果}}}
}

演示:

# 2.自动补全
# 2.1 创建一个 自动补全的索引库 属性有title
DELETE /test
PUT test
{"mappings": {"properties": {"title":{"type": "completion"}}}
}
# 2.2 插入示例数据
POST test/_doc
{"title": ["Sony", "WH-1000XM3"]
}
POST test/_doc
{"title": ["SK-II", "PITERA"]
}
POST test/_doc
{"title": ["Nintendo", "switch"]
}# 2.3 自动补全查询
# 例:输入一个关键字s,看自动补全的结果
# 结果:"SK-II""Sony""switch"
POST /test/_search
{"suggest": {"title_suggest": {"text": "s", "completion": {"field": "title","skip_duplicates": true, "size": 10 }}}
}

结果:
在这里插入图片描述

2.5 实现酒店搜索框自动补全

2.5.1 修改酒店索引库数据结构

在这里插入图片描述

1.修改索引库结构

# 酒店数据索引库
GET /hotel/_mapping
DELETE /hotel
PUT /hotel
{"settings": {"analysis": {"analyzer": {"text_anlyzer": {"tokenizer": "ik_max_word","filter": "py"},"completion_analyzer": {"tokenizer": "keyword","filter": "py"}},"filter": {"py": {"type": "pinyin","keep_full_pinyin": false,"keep_joined_full_pinyin": true,"keep_original": true,"limit_first_letter_length": 16,"remove_duplicated_term": true,"none_chinese_pinyin_tokenize": false}}}},"mappings": {"properties": {"id":{"type": "keyword"},"name":{"type": "text","analyzer": "text_anlyzer","search_analyzer": "ik_smart","copy_to": "all"},"address":{"type": "keyword","index": false},"price":{"type": "integer"},"score":{"type": "integer"},"brand":{"type": "keyword","copy_to": "all"},"city":{"type": "keyword"},"starName":{"type": "keyword"},"business":{"type": "keyword","copy_to": "all"},"location":{"type": "geo_point"},"pic":{"type": "keyword","index": false},"all":{"type": "text","analyzer": "text_anlyzer","search_analyzer": "ik_smart"},"suggestion":{"type": "completion","analyzer": "completion_analyzer"}}}
}# 自动补全查询
GET /hotel/_search
{"suggest": {"mySuggestion": {"text": "shang","completion": {"field": "suggestion","skip_duplicates": true, "size": 10 }}}
}

2.修改HotelDoc

@Data
@NoArgsConstructor
public class HotelDoc {private Long id;private String name;private String address;private Integer price;private Integer score;private String brand;private String city;private String starName;private String business;private String location;private String pic;private Object distance; //新加加字段"距离":酒店距你选择位置的距离private Boolean isAD; //新加加字段"标记":给你置顶的酒店添加一个标记private List<String> suggestion;//新加该字段用于自动补全public HotelDoc(Hotel hotel) {this.id = hotel.getId();this.name = hotel.getName();this.address = hotel.getAddress();this.price = hotel.getPrice();this.score = hotel.getScore();this.brand = hotel.getBrand();this.city = hotel.getCity();this.starName = hotel.getStarName();this.business = hotel.getBusiness();this.location = hotel.getLatitude() + ", " + hotel.getLongitude();this.pic = hotel.getPic();// 自动补全字段的处理this.suggestion = new ArrayList<>();// 添加品牌、城市this.suggestion.add(this.brand);this.suggestion.add(this.city);// 判断商圈是否包含/if (this.business.contains("/")) {// business有多个值,需要切割String[] arr = this.business.split("/");// business的每个值都要加入到suggestion中Collections.addAll(this.suggestion, arr);}else{this.suggestion.add(this.business);}}
}

3.【重新导入数据,不演示,参见之前的批量导入文档功能】查询结果
在这里插入图片描述

2.5.2 RestAPI实现自动补全查询

在这里插入图片描述

在这里插入图片描述

    /*** 自动补全查询*/@Testvoid testSuggest() throws IOException {// 1.准备请求SearchRequest request = new SearchRequest("hotel");// 2.请求参数request.source().suggest(new SuggestBuilder().addSuggestion("hotelSuggest",SuggestBuilders.completionSuggestion("suggestion").size(10).skipDuplicates(true).prefix("s")));// 3.发出请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);// 4.解析结果Suggest suggest = response.getSuggest();// 4.1.根据补全查询名称,获取补全结果CompletionSuggestion suggestion = suggest.getSuggestion("hotelSuggest");// 4.2.获取optionsfor (CompletionSuggestion.Entry.Option option : suggestion.getOptions()) {// 4.3.获取补全的结果String str = option.getText().toString();System.out.println(str);}}

2.5.3 实战

在这里插入图片描述
Mapper层

@RestController
@RequestMapping("hotel")
public class HotelController {@Autowiredprivate IHotelService hotelService;@PostMapping("list")public PageResult search(@RequestBody RequestParams params) {return hotelService.search(params);}@PostMapping("filters")public Map<String, List<String>> getFilters(@RequestBody RequestParams params) {return hotelService.filters(params);}@GetMapping("suggestion")public List<String> getSuggestion(@RequestParam("key") String key) {return hotelService.getSuggestion(key);}
}

Service层

@Slf4j
@Service
public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {@Autowiredprivate RestHighLevelClient restHighLevelClient;/*** 自动补全查询*/@Overridepublic List<String> getSuggestion(String key)  {try {// 1.准备请求SearchRequest request = new SearchRequest("hotel");// 2.请求参数request.source().suggest(new SuggestBuilder().addSuggestion("hotelSuggest",SuggestBuilders.completionSuggestion("suggestion").size(10).skipDuplicates(true).prefix(key)));// 3.发出请求SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);// 4.解析结果Suggest suggest = response.getSuggest();// 4.1.根据补全查询名称,获取补全结果CompletionSuggestion suggestion = suggest.getSuggestion("hotelSuggest");// 4.2.获取optionsList<String> result = new ArrayList<>();for (CompletionSuggestion.Entry.Option option : suggestion.getOptions()) {// 4.3.获取补全的结果String str = option.getText().toString();result.add(str);}return result;} catch (IOException e) {throw new RuntimeException(e);}}
}

结果演示
在这里插入图片描述

三、数据同步

3.1 实现数据同步的方法

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

3.2 使用消息队列MQ实现数据同步

在这里插入图片描述

3.2.1 导入hotel-admin

3.2.2 声明交换机、队列、routingkey

在这里插入图片描述

由于增和改都相当于插入,所以共用一个队列;删除占用一个队列。

一、对消费者hotel-demo的操作

  1. 引入amqp依赖和配置rabbitmq的yml文件
		<!--amqp--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
server:port: 8089
spring:datasource:url: jdbc:mysql://mysql:3306/heima?useSSL=falseusername: rootpassword: 123driver-class-name: com.mysql.jdbc.Driverrabbitmq:host: 192.168.150.101port: 5672username: itcastpassword: 123321virtual-host: /
logging:level:cn.itcast: debugpattern:dateformat: HH:mm:ss:SSS
mybatis-plus:configuration:map-underscore-to-camel-case: truetype-aliases-package: cn.itcast.hotel.pojo
  1. 定义mq的一些常量
public class HotelMqConstants {// 交换机名称public static final String EXCHANGE_NAME = "hotel.topic";// 新增修改队列public static final String INSERT_QUEUE_NAME = "hotel.insert.queue";// 删除队列public static final String DELETE_QUEUE_NAME = "hotel.delete.queue";// 新增修改的RoutingKeypublic static final String INSERT_KEY = "hotel.insert";// 删除的RoutingKeypublic static final String DELETE_KEY = "hotel.delete";
}
  1. 声明交换机和队列,并监听MQ消息【注解方式】
@Component
public class HotelListener {@Autowiredprivate IHotelService hotelService;@RabbitListener(bindings = @QueueBinding(value = @Queue(name = HotelMqConstants.INSERT_QUEUE_NAME),exchange = @Exchange(name = HotelMqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC),key = HotelMqConstants.INSERT_KEY))public void listenHotelInsert(Long hotelId){// 新增hotelService.saveById(hotelId);}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = HotelMqConstants.DELETE_QUEUE_NAME),exchange = @Exchange(name = HotelMqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC),key = HotelMqConstants.DELETE_KEY))public void listenHotelDelete(Long hotelId){// 删除hotelService.deleteById(hotelId);}
}

【bean方式】

@Configuration
public class MqConfig {@Beanpublic TopicExchange topicExchange(){return new TopicExchange(HotelMqConstants.EXCHANGE_NAME,true,false);}@Beanpublic Queue insertQueue(){return new Queue(HotelMqConstants.INSERT_QUEUE_NAME,true);}@Beanpublic Queue deleteQueue(){return new Queue(HotelMqConstants.DELETE_QUEUE_NAME,true);}@Beanpublic Binding insertQueueBinding(){return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(HotelMqConstants.INSERT_KEY);}@Beanpublic Binding deleteQueueBinding(){return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(HotelMqConstants.DELETE_KEY);}
}
  1. RestAPI实现删改
@Slf4j
@Service
public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {@Autowiredprivate RestHighLevelClient restHighLevelClient;/*** 搜索框查询*/@Overridepublic PageResult search(RequestParams params) {try {// 1.准备RequestSearchRequest request = new SearchRequest("hotel");// 2.准备请求参数// 2.1.多条件查询和过滤buildBasicQuery(params, request);// 2.2.分页int page = params.getPage();int size = params.getSize();request.source().from((page - 1) * size).size(size);/*** 2.3.距离排序*/String location = params.getLocation();if (StringUtils.isNotBlank(location)) {// 不为空则查询request.source().sort(SortBuilders.geoDistanceSort("location", new GeoPoint(location)).order(SortOrder.ASC).unit(DistanceUnit.KILOMETERS));}// 3.发送请求SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);// 4.解析响应return handleResponse(response);} catch (IOException e) {throw new RuntimeException("搜索数据失败", e);}}/*** 复合查询*/private void buildBasicQuery(RequestParams params, SearchRequest request) {// 1.准备Boolean复合查询BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();/*** 1.查询关键字* must参与 算分*/// 1.1.关键字搜索,match查询,放到must中String key = params.getKey();if (StringUtils.isNotBlank(key)) {// 不为空,根据关键字查询boolQuery.must(QueryBuilders.matchQuery("all", key));} else {// 为空,查询所有boolQuery.must(QueryBuilders.matchAllQuery());}/*** 2.条件过滤:多条件复合查询* 根据 “品牌 城市 星级 价格范围” 过滤数据* filter不参与 算分*/// 1.2.品牌String brand = params.getBrand();if (StringUtils.isNotBlank(brand)) { // 不为空则查询boolQuery.filter(QueryBuilders.termQuery("brand", brand));}// 1.3.城市String city = params.getCity();if (StringUtils.isNotBlank(city)) {// 不为空则查询boolQuery.filter(QueryBuilders.termQuery("city", city));}// 1.4.星级String starName = params.getStarName();if (StringUtils.isNotBlank(starName)) {// 不为空则查询boolQuery.filter(QueryBuilders.termQuery("starName", starName));}// 1.5.价格范围Integer minPrice = params.getMinPrice();Integer maxPrice = params.getMaxPrice();if (minPrice != null && maxPrice != null) {// 不为空则查询maxPrice = maxPrice == 0 ? Integer.MAX_VALUE : maxPrice;boolQuery.filter(QueryBuilders.rangeQuery("price").gte(minPrice).lte(maxPrice));}/*** 3.算分函数查询* 置顶功能:给你置顶的酒店添加一个标记,并按其算分*/FunctionScoreQueryBuilder functionScoreQuery = QueryBuilders.functionScoreQuery(boolQuery, // 原始查询,boolQuerynew FunctionScoreQueryBuilder.FilterFunctionBuilder[]{ // function数组new FunctionScoreQueryBuilder.FilterFunctionBuilder(QueryBuilders.termQuery("isAD", true), // 过滤条件ScoreFunctionBuilders.weightFactorFunction(10) // 算分函数)});/*** 4.设置查询条件*/request.source().query(functionScoreQuery);}/*** 结果解析*/private PageResult handleResponse(SearchResponse response) {SearchHits searchHits = response.getHits();// 4.1.总条数long total = searchHits.getTotalHits().value;// 4.2.获取文档数组SearchHit[] hits = searchHits.getHits();// 4.3.遍历List<HotelDoc> hotels = new ArrayList<>(hits.length);for (SearchHit hit : hits) {// 4.4.获取sourceString json = hit.getSourceAsString();// 4.5.反序列化,非高亮的HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);// 4.6.处理高亮结果// 1)获取高亮mapMap<String, HighlightField> map = hit.getHighlightFields();if (map != null && !map.isEmpty()) {// 2)根据字段名,获取高亮结果HighlightField highlightField = map.get("name");if (highlightField != null) {// 3)获取高亮结果字符串数组中的第1个元素String hName = highlightField.getFragments()[0].toString();// 4)把高亮结果放到HotelDoc中hotelDoc.setName(hName);}}// 4.8.排序信息Object[] sortValues = hit.getSortValues(); // 获取排序结果if (sortValues.length > 0) {/*** 由于该程序是根据距离[酒店距你选择位置的距离]进行排序,所以排序结果为距离*/hotelDoc.setDistance(sortValues[0]);}// 4.9.放入集合hotels.add(hotelDoc);}return new PageResult(total, hotels);}/*** 多条件聚合*/@Overridepublic Map<String, List<String>> filters(RequestParams params) {try {// 1.准备请求SearchRequest request = new SearchRequest("hotel");// 2.请求参数// 2.1.query查询信息buildBasicQuery(params, request);// 2.2.sizerequest.source().size(0);// 2.3.聚合buildAggregation(request);// 3.发出请求SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);// 4.解析结果Map<String, List<String>> result = new HashMap<>();Aggregations aggregations = response.getAggregations();// 4.1.根据品牌名称,获取聚合结果List<String> brandList = getAggByName(aggregations, "brandAgg");// 放入mapresult.put("品牌",brandList);// 4.2.根据城市名称,获取聚合结果List<String> cityList = getAggByName(aggregations, "cityAgg");// 放入mapresult.put("城市",cityList);// 4.3.根据星级名称,获取聚合结果List<String> starList = getAggByName(aggregations, "starAgg");// 放入mapresult.put("星级",starList);return result;} catch (IOException e) {throw new RuntimeException(e);}}public List<String> getAggByName(Aggregations aggregations, String aggName) {// 4.1.根据聚合名称,获取聚合结果Terms brandAgg = aggregations.get(aggName);// 4.2.获取bucketsList<? extends Terms.Bucket> buckets = brandAgg.getBuckets();// 4.3.遍历List<String> brandList = new ArrayList<>();for (Terms.Bucket bucket : buckets) {String key = bucket.getKeyAsString();brandList.add(key);}return brandList;}public void buildAggregation(SearchRequest request) {request.source().aggregation(AggregationBuilders.terms("brandAgg").field("brand").size(100));request.source().aggregation(AggregationBuilders.terms("cityAgg").field("city").size(100));request.source().aggregation(AggregationBuilders.terms("starAgg").field("starName").size(100));}/*** 自动补全查询*/@Overridepublic List<String> getSuggestion(String key)  {try {// 1.准备请求SearchRequest request = new SearchRequest("hotel");// 2.请求参数request.source().suggest(new SuggestBuilder().addSuggestion("hotelSuggest",SuggestBuilders.completionSuggestion("suggestion").size(10).skipDuplicates(true).prefix(key)));// 3.发出请求SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);// 4.解析结果Suggest suggest = response.getSuggest();// 4.1.根据补全查询名称,获取补全结果CompletionSuggestion suggestion = suggest.getSuggestion("hotelSuggest");// 4.2.获取optionsList<String> result = new ArrayList<>();for (CompletionSuggestion.Entry.Option option : suggestion.getOptions()) {// 4.3.获取补全的结果String str = option.getText().toString();result.add(str);}return result;} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic void deleteById(Long hotelId) {try {// 1.创建requestDeleteRequest request = new DeleteRequest("hotel", hotelId.toString());// 2.发送请求restHighLevelClient.delete(request, RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException("删除酒店数据失败", e);}}@Overridepublic void saveById(Long hotelId) {try {// 查询酒店数据,应该基于Feign远程调用hotel-admin,根据id查询酒店数据(现在直接去数据库查)Hotel hotel = getById(hotelId);// 转换HotelDoc hotelDoc = new HotelDoc(hotel);// 1.创建RequestIndexRequest request = new IndexRequest("hotel").id(hotelId.toString());// 2.准备参数request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);// 3.发送请求restHighLevelClient.index(request, RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException("新增酒店数据失败", e);}}
}

二、对发送者hotel-admin的操作

  1. 引入amqp依赖和配置rabbitmq的yml文件【同上】
  2. 定义mq的一些常量【同上】
  3. 当发送者对mysql数据库改动时,发送消息给MQ
@RestController
@RequestMapping("hotel")
public class HotelController {@Autowiredprivate IHotelService hotelService;// 注入发送消息的api@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 根据id查询*/@GetMapping("/{id}")public Hotel queryById(@PathVariable("id") Long id){return hotelService.getById(id);}/*** 查询当前页内容*/@GetMapping("/list")public PageResult hotelList(@RequestParam(value = "page", defaultValue = "1") Integer page,@RequestParam(value = "size", defaultValue = "1") Integer size){Page<Hotel> result = hotelService.page(new Page<>(page, size));return new PageResult(result.getTotal(), result.getRecords());}/*** 新增,并发送给mq消息*/@PostMappingpublic void saveHotel(@RequestBody Hotel hotel){// 新增酒店hotelService.save(hotel);// 发送MQ消息rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId());}/*** 修改,并发送给mq消息*/@PutMapping()public void updateById(@RequestBody Hotel hotel){if (hotel.getId() == null) {throw new InvalidParameterException("id不能为空");}hotelService.updateById(hotel);// 发送MQ消息rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId());}/*** 删除,并发送给mq消息*/@DeleteMapping("/{id}")public void deleteById(@PathVariable("id") Long id) {hotelService.removeById(id);// 发送MQ消息rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.DELETE_KEY, id);}
}

四、集群

单机的elasticsearch做数据存储,必然面临两个问题:海量数据存储问题、单点故障问题。
>> 海量数据存储问题:将索引库从逻辑上拆分为N个分片(shard),存储到多个节点
>> 单点故障问题:将分片数据在不同节点备份(replica )

4.1 搭建ES集群

我们会在单机上利用docker容器运行多个es实例来模拟es集群。不过生产环境推荐大家每一台服务节点仅部署一个es的实例。
部署es集群可以直接使用docker-compose来完成,但这要求你的Linux虚拟机至少有4G的内存空间

  1. 创建es集群
    首先编写一个docker-compose文件,内容如下:
version: '2.2'
services:es01:image: elasticsearch:7.12.1container_name: es01environment:- node.name=es01- cluster.name=es-docker-cluster- discovery.seed_hosts=es02,es03- cluster.initial_master_nodes=es01,es02,es03- "ES_JAVA_OPTS=-Xms512m -Xmx512m"volumes:- data01:/usr/share/elasticsearch/dataports:- 9200:9200networks:- elastices02:image: elasticsearch:7.12.1container_name: es02environment:- node.name=es02- cluster.name=es-docker-cluster- discovery.seed_hosts=es01,es03- cluster.initial_master_nodes=es01,es02,es03- "ES_JAVA_OPTS=-Xms512m -Xmx512m"volumes:- data02:/usr/share/elasticsearch/dataports:- 9201:9200networks:- elastices03:image: elasticsearch:7.12.1container_name: es03environment:- node.name=es03- cluster.name=es-docker-cluster- discovery.seed_hosts=es01,es02- cluster.initial_master_nodes=es01,es02,es03- "ES_JAVA_OPTS=-Xms512m -Xmx512m"volumes:- data03:/usr/share/elasticsearch/datanetworks:- elasticports:- 9202:9200
volumes:data01:driver: localdata02:driver: localdata03:driver: localnetworks:elastic:driver: bridge

es运行需要修改一些linux系统权限,修改/etc/sysctl.conf文件

vi /etc/sysctl.conf

添加下面的内容:

vm.max_map_count=262144

然后执行命令,让配置生效:

sysctl -p

通过docker-compose启动集群:

docker-compose up -d
  1. 集群状态监控

kibana可以监控es集群,不过新版本需要依赖es的x-pack 功能,配置比较复杂。

这里推荐使用cerebro来监控es集群状态,官方网址:https://github.com/lmenezes/cerebro

课前资料已经提供了安装包:

解压即可使用,非常方便。

解压好的目录如下:

进入对应的bin目录:

双击其中的cerebro.bat文件即可启动服务。

访问http://localhost:9000 即可进入管理界面:

在这里插入图片描述

输入你的elasticsearch的任意节点的地址和端口,点击connect即可:

绿色的条,代表集群处于绿色(健康状态)。
在这里插入图片描述

  1. 创建索引库

1)利用kibana的DevTools创建索引库

在DevTools中输入指令:

PUT /itcast
{"settings": {"number_of_shards": 3, // 分片数量"number_of_replicas": 1 // 副本数量},"mappings": {"properties": {// mapping映射定义 ...}}
}

2)利用cerebro创建索引库

利用cerebro还可以创建索引库:
在这里插入图片描述

填写索引库信息:

点击右下角的create按钮:
在这里插入图片描述

  1. 查看分片效果

回到首页,即可查看索引库分片效果:
在这里插入图片描述

4.2 集群职责和脑裂问题

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

4.3 集群故障转移

在这里插入图片描述

4.4 集群分布式存储与查询

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/549475.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于PyTorch的视频分类实战

1、数据集下载 官方链接&#xff1a;https://serre-lab.clps.brown.edu/resource/hmdb-a-large-human-motion-database/#Downloads 百度网盘连接&#xff1a; https://pan.baidu.com/s/1sSn--u_oLvTDjH-BgOAv_Q?pwdxsri 提取码: xsri 官方链接有详细的数据集介绍&#xf…

基于SpringBoot的后勤管理系统【附源码】

后勤管理系统开发说明 开发语言&#xff1a;Java 框架&#xff1a;ssm JDK版本&#xff1a;JDK1.8 服务器&#xff1a;tomcat7 数据库&#xff1a;mysql 5.7&#xff08;一定要5.7版本&#xff09; 数据库工具&#xff1a;Navicat11 开发软件&#xff1a;eclipse/myecli…

3d导出stl格式模型破碎是什么原因,怎么解决?---模大狮模型网

在导出3D模型为STL格式时出现破碎(或称为碎片化)的情况通常是由于模型中存在几何上的问题造成的。以下是一些可能导致STL模型破碎的原因以及解决方法&#xff1a; 3d导出stl格式模型破碎的原因&#xff1a; 模型不封闭&#xff1a;STL格式要求模型必须是封闭的实体&#xff0c…

【ArcGISProSDK】获取扩展模块许可到期时间

结果 以下是获取的3D分析模块的许可到期时间 代码 var licenseExpirationDate ArcGIS.Core.Licensing.LicenseInformation.GetExpirationDate(LicenseCodes.Analyst3D); 扩展模块 MemberDescriptionAnalyst3D3D AnalystAviationAirportsAviation and AirportsBusinessAnal…

数据可视化实战(二)

将每个城市在每个月份平均PM2.5绘制成折线图 import pandas as pd import matplotlib.pyplot as plt df pd.read_excel(./PM2.5.xlsx)display(df.head(10)) df.shape # (161630, 15)城市年份月份日期小时季节PM2.5露点湿度压强温度风向累计风速降水量累计降水量0北京2010112…

【Python循环4/5】跳出循环的办法

目录 导入 break 具体用法 在for循环中的运用 在while循环中的运用 continue 具体用法 区别 总结 导入 前几天的博文里&#xff0c;我们学习了for循环和while循环。 无论是for循环还是while循环&#xff0c;默认的终止条件都是边界条件。在触发边界条件之前&am…

电脑插上网线之后仍然没网络怎么办?

前言 有小伙伴在使用Windows系统的时候&#xff0c;经常会遇到电脑没网络&#xff0c;但又不知道具体怎么调整才好。 本篇内容适合插网线和使用Wi-Fi的小伙伴&#xff0c;文章本质上是重置电脑的网络设置。 注意事项&#xff1a;网络重置操作会让已连接过的wifi密码丢失&…

使用 stable-diffusion 入门级教程【Mac】

最近一直在短视频平台刷到AI生成的图片&#xff0c;质量也非常不错。术哥也跟我讲解了下如何安装使用。于是周末试了试。 也差点变成从入门到放弃了&#xff0c;所以也把过程中遇到的问题记录一下。 目前基本上运行正常&#xff0c;只是内存稍微小了点&#xff0c;把质量调低即…

【PyTorch】一文详细介绍【0维】张量

【PyTorch】一文详细介绍【0维】张量 &#x1f308; 个人主页&#xff1a;高斯小哥 &#x1f525; 高质量专栏&#xff1a;Matplotlib之旅&#xff1a;零基础精通数据可视化、Python基础【高质量合集】、PyTorch零基础入门教程&#x1f448; 希望得到您的订阅和支持~ &#x1…

供应链投毒预警 | 恶意Py组件tohoku-tus-iot-automation开展窃密木马投毒攻击

概述 上周&#xff08;2024年3月6号&#xff09;&#xff0c;悬镜供应链安全情报中心在Pypi官方仓库&#xff08;https://pypi.org/&#xff09;中捕获1起新的Py包投毒事件&#xff0c;Python组件tohoku-tus-iot-automation 从3月6号开始连续发布6个不同版本恶意包&#xff0c…

20240318uniapp怎么引用组件

在script中增加 import index from "/pages/index/index.vue" 把index直接整个作为一个组件引入 然后注册组件 在export default中增加 components: {index:index }, 注册了index组件&#xff0c;内容为import的index 然后就可以在template里使用 <index&…

面试笔记——Redis(缓存击穿、缓存雪崩)

缓存击穿 缓存击穿&#xff08;Cache Breakdown&#xff09;&#xff1a; 当某个缓存键的缓存失效时&#xff08;如&#xff0c;过期时间&#xff09;&#xff0c;同时有大量的请求到达&#xff0c;并且这些请求都需要获取相同的数据&#xff0c;这些请求会同时绕过缓存系统&a…