elasticsearch 数据同步

news/2025/1/21 15:10:20/文章来源:https://www.cnblogs.com/WarBlog/p/18683079

数据同步

elasticsearch中的酒店数据来自于mysql数据库,因此mysql数据发生改变时,elasticsearch也必须跟着改变,这个就是elasticsearch与mysql之间的数据同步

异步通知

流程如下:

  • hotel-admin对mysql数据库数据完成增、删、改后,发送MQ消息
  • hotel-demo监听MQ,接收到消息后完成elasticsearch数据修改

MQ结构如图:

拉取MQ镜像

docker pull rabbitmq:3-management

拉取失败超时 Error response from daemon: Get “https://registry-1.docker.io/v2/“解决方案

https://registry-1.docker.io/v2/ 地址是 docker官方的镜像源,下载很慢的,一般会自己指定国内映射的加速镜像源。

修改或新建/etc/docker/daemon.json 文件

{"registry-mirrors" : ["https://2a6bf1988cb6428c877f723ec7530dbc.mirror.swr.myhuaweicloud.com","https://docker.m.daocloud.io","https://hub-mirror.c.163.com","https://mirror.baidubce.com","https://your_preferred_mirror","https://dockerhub.icu","https://docker.registry.cyou","https://docker-cf.registry.cyou","https://dockercf.jsdelivr.fyi","https://docker.jsdelivr.fyi","https://dockertest.jsdelivr.fyi","https://mirror.aliyuncs.com","https://dockerproxy.com","https://mirror.baidubce.com","https://docker.m.daocloud.io","https://docker.nju.edu.cn","https://docker.mirrors.sjtug.sjtu.edu.cn","https://docker.mirrors.ustc.edu.cn","https://mirror.iscas.ac.cn","https://docker.rainbond.cc"],"insecure-registries" : ["docker-registry.zjq.com"],"log-driver": "json-file","log-opts": {"max-size": "10m","max-file": "10"},"data-root": "/data/docker"
}
View Code
systemctl daemon-reload
systemctl restart docker
// 在执行上面命令时,以前创建的容器会被删除
docker pull rabbitmq:3-management
docker run \-e RABBITMQ_DEFAULT_USER=guest \-e RABBITMQ_DEFAULT_PASS=guest \--name mq \--hostname mq1 \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3-management

重新加载镜像,创建并运行容器(注意:加载手动上传镜像(tar包文件),在创建运行容器时,必须要cd 到tar包文件所在目录下

// 加载上传的镜像
docker load -i /usr/local/docker/tools/kibana.tarcd  /usr/local/docker/tools/
// 创建并运行容器
docker run -d \
--name kibana \
-e ELASTICSEARCH_HOSTS=http://es:9200 \
--network=es-net \
-p 5601:5601  \
kibana:7.12.1
View Code

通过MQ实现数据同步

1)引入依赖

        <!--amqp--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

2)配置MQ服务

spring:... ...rabbitmq:host: 192.168.xxx.xxxxport: 5672username: guestpassword: guestvirtual-host: /

3) 声明队列交换机名称

hotel-admin(消息发送方)

hotel-demo(消息接收方)

创建相同包(cn.marw.hotel.constatnts)并在其下新建一个类MqConstants

 1 package cn.marw.hotel.constatnts;
 2 
 3     public class MqConstants {
 4     /**
 5      * 交换机
 6      */
 7     public final static String HOTEL_EXCHANGE = "hotel.topic";
 8     /**
 9      * 监听新增和修改的队列
10      */
11     public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
12     /**
13      * 监听删除的队列
14      */
15     public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
16     /**
17      * 新增或修改的RoutingKey
18      */
19     public final static String HOTEL_INSERT_KEY = "hotel.insert";
20     /**
21      * 删除的RoutingKey
22      */
23     public final static String HOTEL_DELETE_KEY = "hotel.delete";
24 }
View Code

4)发送方(发送MQ消息)

在hotel-admin中的增、删、改业务中分别发送MQ消息:

5)接收方(接受MQ消息)

hotel-demo接收到MQ消息要做的事情包括:

  • 新增消息:根据传递的hotel的id查询hotel信息,然后新增一条数据到索引库
  • 删除消息:根据传递的hotel的id删除索引库中的一条数据

定义接口:hotel-demo的cn.marw.hotel.service包下的IHotelService接口中添加 新增、删除业务

1 void deleteById(Long id);
2 
3 void insertById(Long id);

实现接口:hotel-demo中的cn.marw.hotel.service.impl包下的HotelService中实现业务:

@Autowired
private RestHighLevelClient client;@Override
public void deleteById(Long id) {try {// 1.准备RequestDeleteRequest request = new DeleteRequest("hotel", id.toString());// 2.发送请求
        client.delete(request, RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException(e);}
}@Override
public void insertById(Long id) {try {// 0.根据id查询酒店数据Hotel hotel = getById(id);// 转换为文档类型HotelDoc hotelDoc = new HotelDoc(hotel);// 1.准备Request对象IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());// 2.准备Json文档
        request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);// 3.发送请求
        client.index(request, RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException(e);}
}

7)声明队列和交换机(通常消息的接收方,来完成声明)

声明方式:基于Bean和基于注解

7.1)基于Bean

7.1.1)在hotel-demo中,定义配置类,声明队列、交换机:

 1 @Configuration
 2 public class MqConfig {
 3     @Bean
 4     public TopicExchange topicExchange(){
 5         return new TopicExchange(MqConstants.HOTEL_EXCHANGE, true, false);
 6     }
 7 
 8     @Bean
 9     public Queue insertQueue(){
10         return new Queue(MqConstants.HOTEL_INSERT_QUEUE, true);
11     }
12 
13     @Bean
14     public Queue deleteQueue(){
15         return new Queue(MqConstants.HOTEL_DELETE_QUEUE, true);
16     }
17 
18     @Bean
19     public Binding insertQueueBinding(){
20         return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
21     }
22 
23     @Bean
24     public Binding deleteQueueBinding(){
25         return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
26     }
27 }
View Code

7.1.2)编写监听器

在hotel-demo中的cn.marw.hotel.mq包新增一个类:

@Component
public class HotelListener {@Autowiredprivate IHotelService hotelService;/*** 监听酒店新增或修改的业务* @param id 酒店id*/@RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)public void listenHotelInsertOrUpdate(Long id){hotelService.insertById(id);}/*** 监听酒店删除的业务* @param id 酒店id*/@RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)public void listenHotelDelete(Long id){hotelService.deleteById(id);}
}
 

7.2)基于注解

 1 @Component
 2 public class HotelListener {
 3 
 4     @Autowired
 5     private IHotelService hotelService;
 6 
 7     @RabbitListener(bindings = @QueueBinding(
 8             value = @Queue(name = HotelMqConstants.INSERT_QUEUE_NAME),
 9             exchange = @Exchange(name = HotelMqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC),
10             key = HotelMqConstants.INSERT_KEY
11     ))
12     public void listenHotelInsert(Long hotelId){
13         // 新增
14         hotelService.saveById(hotelId);
15     }
16 
17     @RabbitListener(bindings = @QueueBinding(
18             value = @Queue(name = HotelMqConstants.DELETE_QUEUE_NAME),
19             exchange = @Exchange(name = HotelMqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC),
20             key = HotelMqConstants.DELETE_KEY
21     ))
22     public void listenHotelDelete(Long hotelId){
23         // 删除
24         hotelService.deleteById(hotelId);
25     }
26 }

声明成功后,Rabbit MQ服务端就会出现对应的交换机和队列,如图

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/872817.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CSS实现各种形状

CSS3的一个非常酷的特性是允许我们创建各种规则和不规则形状的图形,从而可以减少图片的使用。以前只能在Photoshop等图像编辑软件中制作的复杂图形现在使用CSS3就可以完成了。通过使用新的CSS属性,像transform和border-radius,我们可以创建非常漂亮和复杂的图形效果。 圆形 …

如何防止网络钓鱼攻击

一、什么是网络钓鱼 ? 网络钓鱼是指试图通过操纵受害者来窃取敏感信息的行为。攻击者伪装成合法来源并发送伪装的消息,通常是通过电子邮件,希望您点击链接或打开附件。网络钓鱼者使用社会工程学并依靠人为错误来诱捕受害者。根据 AAG 最近的一项研究,钓鱼仍然是最常见的网络…

IAT 隐藏和混淆

一、介绍 导入地址表 (IAT) 包含有关 PE 文件的信息,例如使用过的函数和导出它们的 DLL。此类信息可用于对二进制文件进行签名和检测,如下图所示PE 文件导入被认为高度可疑的函数二、隐藏混淆方法 (1)IAT 隐藏和混淆—方法 1 自定义函数可以在运行时使用 GetProcAddress、G…

React席哪个能优化

使用map + keyuseMemo,useCallbak组件Fragement异步组件lazy路由懒加载PureComponent,meno拓展时间分片,延迟加载SSR本作品采用 知识共享署名-非商业性使用 2.5 中国大陆许可协议进行许可。

【Unity游戏开发】基于xLua构建一个简单的3D游戏框架

一、xLua简介 xLua是基于Lua语言的开源插件,能够支持在Unity中嵌入Lua脚本(Lua脚本支持热更,适用于游戏的业务逻辑开发和维护) xLua源码地址 二、构建方法新建一个Unity项目(模板选择Universal 3D(URP))下载xLua源码,将Assets目录及其子目录下的全部文件拷贝到Unity项…

Windows RocketMQ 安装-截止当前最新版本(RocketMQ-5.3.1)图文教程

Windows RocketMQ 安装(图文教程) Windows RocketMQ 安装,截止当前最新版本(RocketMQ-5.3.1)图文教程,本文只是最简单的安装方法,旨在能快速使用,若需要更多的配置,则需要你自行查阅官方文档,或互联网搜索答案咯,哈哈哈哈 前言 本文中所有的路径,包括 JAVA 环境,都不…

2025年职场人常用的桌面日程管理软件有哪些?推荐这五款

在繁忙的职场生活中,一款高效的日程管理软件无疑是提升工作效率的秘密武器。 进入2025年,今天给大家介绍5款打工人常用的电脑桌面日程管理软件,它们各有优缺点,看看哪款是你需要的吧! 一、Win系统日历 作为Windows系统自带的日程管理工具,Win系统日历以其简洁易用著称。你…

大趋势下企业如何实现智能制造 | 珠海盈致

在当今全球制造业竞争日益激烈的背景下,智能制造已成为企业提升核心竞争力的关键路径。随着物联网、大数据、云计算、人工智能等技术的快速发展,智能制造正逐步从概念走向现实,为企业带来生产效率、产品质量、成本控制等方面的显著提升。那么,在大趋势下,企业如何实现智能…

【docker】如何运行没有Root权限的Docker?

以下文章来源于运维自习室 ,作者运维自习室 Rootless模式的目的是让Docker守护进程以非root用户身份运行。该方案以实验特性的方式在v19.03版本引入,并在v20.10版本成为正式功能。 实践 官方文档已经做了详细的介绍,这里仅仅做一下实践复现。 具体实践环境为: CentOS 7.2 d…

从0开始的ctf旅行之pwn篇

*最后更新时间:2025-01-21 10:17:43 星期二 * 零、前言 本篇文章是我个人从0开始打pwn的真实 坐牢 做题经验,包含了大量的参考链接和个人思考,绝大多数题目来自MoeCTF2024(https://ctf.xidian.edu.cn/) 本文默认你有以下基础:python3 会装虚拟机+基本的Linux操作 C语言一、…

识别两个表格文件,根据手机号进行匹配相同行并按照需要字段输出

python代码# -*- coding: utf-8 -*- # encoding:utf-8 from flask import Flask, render_template, request, send_file,jsonify import os, requests import pandas as pd from datetime import datetime, timedelta import time, json from log import logging import thread…

语音播报,套件多少异常的问题。(含源代码)

在工作中遇到一家工厂老板的需求:因为产品是有多个配件组成,在生产的时候,经常会多生产,少生产,在组装时,也会出现配件多少的问题,现就此问题设计一款程序。多出,少的,异常的,正常好,会开语音播报。现将全部代码给出以备。 import inspect import os import threadi…