MySQL与ES数据同步之异步调用

文章目录

  • 简述
  • SpringBoot项目
    • 引入依赖
    • 配置文件
    • 项目结构
    • 实体类
    • 配置类
      • RabbitMQ交换机队列声明,绑定配置类
      • 回调接口配置类
    • Mapper接口
      • UserMapper接口
      • UserEsMapper
    • Controller类
    • Service接口
    • Service实现类
    • 监听类/消费者

简述

上一篇是同步调用,我们在中间加上MQ就可以实现异步调用,这种方式性能高,不易出现数据丢失问题,多源写入之间相互隔离,便于扩展更多的数据源写入。
同时也会带来一些问题,首先还是代码侵入强,其次系统复杂度会增加,因为引入了消息中间件
可能出现延时问题:MQ是异步消费模型,可能会造成延时。
这种方案也不是很推荐,简单了解学习一下就好。

下面通过SpringBoot项目演示一下,首先本地要有MQ,我这里使用RabbitMQ。若本地没有,可移步:Windows版Docker安装RabbitMQ
Linux的Docker也类似

对RabbitMQ还不是很了解的,可以打开我的主页查看RabbitMQ系列教程

这里只做最简单的MQ可靠性配置

SpringBoot项目

引入依赖

全部依赖如下

        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-elasticsearch</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.3.1</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

配置文件

注意修改Mysql,ES,rabbitmq地址及信息

# 端口号8080
server:port: 8080# 数据库名:mysql,用户名root,密码123456
spring:datasource:username: rootpassword: 123456url: jdbc:mysql://mysql地址:3306/mysql?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghaidriver-class-name: com.mysql.cj.jdbc.Driverelasticsearch:rest:uris: ES地址:9200rabbitmq:host: rabbitmq地址port: 5672username: adminpassword: admin#确认消息已发送到交换机publisher-confirm-type: correlated#确认消息已发送到队列(Queue)publisher-returns: true# mybatis-plus配置
mybatis-plus:# xml文件位置mapper-locations: classpath:mapper/*.xml

项目结构

在这里插入图片描述

实体类

/*** mysql(user)与ES(user-demo)实体类*/
@Data
@TableName(value = "user_t")
@Document(indexName = "user-demo")
public class User {@Idprivate String id;private String userName;private String address;
}

配置类

RabbitMQ交换机队列声明,绑定配置类

/*** RabbitMQ交换机队列声明,绑定配置类*/
@Configuration
public class Config {//交换机名称public static final String X_EXCHANGE = "X";//队列名称public static final String QUEUE_INSERT = "A";public static final String QUEUE_DELETE = "B";public static final String QUEUE_UPDATE = "C";//声明交换机xExchange@Bean("xExchange")public DirectExchange xExchange() {return new DirectExchange(X_EXCHANGE);}//声明队列A@Bean("queueA")public Queue queueInsert() {return QueueBuilder.durable(QUEUE_INSERT).build();}//声明队列B@Bean("queueB")public Queue queueDelete() {return QueueBuilder.durable(QUEUE_DELETE).build();}//声明队列C@Bean("queueC")public Queue queueUpdate() {return QueueBuilder.durable(QUEUE_UPDATE).build();}//绑定交换机与队列//A与X通过XA线路绑定@Beanpublic Binding queueInsertBindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange) {return BindingBuilder.bind(queueA).to(xExchange).with("XA");}//B与X通过XB线路绑定@Beanpublic Binding queueDeleteBindingX(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange) {return BindingBuilder.bind(queueB).to(xExchange).with("XB");}//C与X通过XC线路绑定@Beanpublic Binding queueUpdateBindingX(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange") DirectExchange xExchange) {return BindingBuilder.bind(queueC).to(xExchange).with("XC");}
}

回调接口配置类

/*** 回调接口*/
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init() {//内部接口注入类中rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnCallback(this);}/*** 交换机确定回调方法* 1.发消息 交换机接收到消息 回调* 1.1 correlationData 保存回调消息的ID及相关信息* 1.2 交换机收到消息 ack=true* 1.3 cause null* 2.发消息 交换机接受失败 回调* 2.1 correlationData 保存回调消息的ID及相关信息* 2.2 交换机收到消息 ack=false* 2.3 cause 失败原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String id = correlationData != null ? correlationData.getId() : "";if (ack) {log.info("交换机回报消息:收到id为:{}的消息", id);} else {log.info("交换机回报消息:未经收到id为:{}的消息,原因为:{}", id, cause);}}/*** 队列失败回报* @param message 消息* @param i 返回码* @param s 返回信息* @param s1 交换机* @param s2 路由*/@Overridepublic void returnedMessage(Message message, int i, String s, String s1, String s2) {log.error("队列回报消息:消息被交换机:{}退回,路由key:{},退回原因:{}", s1,s2,s);}
}

Mapper接口

UserMapper接口

/*** mysql user实体Mapper接口*/
public interface UserMapper extends BaseMapper<User> {
}

UserEsMapper

/*** ES user-demo实体Mapper接口*/
@Repository
public interface UserEsMapper extends ElasticsearchRepository<User,String> {}

Controller类

此处Controller充当生产者,接到请求,先执行mysql操作,然后将消息按情况通过交换机转发到不同的队列,相应的消费者收到消息后对ES进行处理

/*** 异步调用方式实现mysql与ES数据同步Controller/消息生产者*/
@Slf4j
@RestController
@RequestMapping(value = "/asyn")
public class DataController {@Resourceprivate IDataService dataService;@Resourceprivate RabbitTemplate rabbitTemplate;/*** 同步更新mysql和ES的user信息* @param user user实体*/@GetMapping("/update")public void updateData(User user){dataService.updateMysqlData(user);String key = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(key);rabbitTemplate.convertAndSend("X","XA",user.getId(),correlationData);log.info("Producer消息:已发送消息:{}到队列A中等待ES更新处理,消息ID:{}",user.getId(),key);}/*** 查询user表信息* @return user信息集合*/@GetMapping("/findData")public List<User> findAllData(){return dataService.findAllData();}/*** 同步根据id删除mysql和ES中user对应的数据信息* @param id 需要删除的信息id*/@GetMapping("/delete")public void deleteDataById(String id){dataService.deleteDataById(id);String key = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(key);rabbitTemplate.convertAndSend("X","XB",id,correlationData);log.info("Producer消息:已发送消息:{}到队列B中等待ES删除处理,消息ID:{}",id,key);}/*** 同步新增mysql和ES的user数据* @param user user实体*/@GetMapping("addData")public void addData(User user){dataService.addData(user);String key = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(key);rabbitTemplate.convertAndSend("X","XA",user.getId(),correlationData);log.info("Producer消息:已发送消息:{} 到队列A中等待ES新增处理,消息ID:{}",user.getId(),key);}/*** 同步删除mysql和ES中所有user信息*/@GetMapping("deleteAll")public void deleteAllData(){dataService.deleteAllData();dataService.esDeleteAllData();}/*** 查询ES中所有user信息*/@GetMapping("findEs")public Iterable<User> findEs(){return dataService.findEs();}
}

Service接口

/*** 异步调用方式实现mysql与ES数据同步Service*/
public interface IDataService extends IService<User> {/*** 根据id更新mysql数据* @param user 需要更新数据的user对象*/void updateMysqlData(User user);/*** 查询所有数据* @return user对象集合*/List<User> findAllData();/*** mysql根据id删除信息* @param id 需要删除信息的id*/void deleteDataById(String id);/*** mysql新增数据* @param user 需要新增数据的对象*/void addData(User user);/*** ES根据ID删除数据* @param id 需要删除信息的id*/void esDeleteDataById(String id);/*** ES新增/根据ID修改数据* @param user 需要新增/根据ID修改数据的对象*/void esAddData (User user);/*** mysql删除user表所有数据*/void deleteAllData();/*** es删除index=user-demo中所有数据*/void esDeleteAllData();/*** 查询ES中所有数据信息*/Iterable<User> findEs();
}

Service实现类

/*** 异步调用方式实现mysql与ES数据同步Service实现类*/
@Service
public class DataServiceImpl extends ServiceImpl<UserMapper, User> implements IDataService {@Resourceprivate UserMapper userMapper;@Resourceprivate UserEsMapper userEsMapper;/*** 根据id更新mysql数据* @param user 需要更新数据的user对象*/@Overridepublic void updateMysqlData(User user) {userMapper.updateById(user);}/*** 查询所有数据* @return user对象集合*/@Overridepublic List<User> findAllData() {return userMapper.selectList(null);}/*** mysql根据id删除信息* @param id 需要删除信息的id*/@Overridepublic void deleteDataById(String id) {userMapper.deleteById(id);}/*** mysql新增数据* @param user 需要新增数据的对象*/@Overridepublic void addData(User user) {userMapper.insert(user);}/*** ES根据ID删除数据* @param id 需要删除信息的id*/@Overridepublic void esDeleteDataById(String id) {userEsMapper.deleteById(id);}/*** ES新增/根据ID修改数据* @param user 需要新增/根据ID修改数据的对象*/@Overridepublic void esAddData(User user) {userEsMapper.save(user);}/*** mysql删除user表所有数据*/@Overridepublic void deleteAllData() {userMapper.delete(null);}/*** es删除index=user-demo中所有数据*/@Overridepublic void esDeleteAllData() {userEsMapper.deleteAll();}/*** 查询ES中user所有信息* @return 查询user信息集合*/@Overridepublic Iterable<User> findEs() {return userEsMapper.findAll();}}

监听类/消费者

/*** 异步调用方式实现mysql与ES数据同步消息消费者*/
@Slf4j
@Component
public class Consumer {@Resourceprivate IDataService dataService;@Resourceprivate UserMapper userMapper;//接收消息@RabbitListener(queues="A")public void addData(Message message){log.info("Consumer消息:当前时间:{},收到A队列的消息:{},进行ES新增操作",new Date().toString(),new String(message.getBody()));QueryWrapper<User> queryWrapper = new QueryWrapper<>();queryWrapper.eq("id",new String(message.getBody()));User user = userMapper.selectOne(queryWrapper);dataService.esAddData(user);log.info("ES新增/更新数据为:{}",user);}@RabbitListener(queues = "B")public void delete(Message message){log.info("Consumer消息:当前时间:{},收到B队列的消息:{},进行ES删除操作",new Date().toString(),new String(message.getBody()));dataService.esDeleteDataById(new String(message.getBody()));}@RabbitListener(queues="C")public void update(Message message){}
}

操作完成

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/107832.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

新手询问想要成功学好嵌入式开发有什么建议吗?

今日话题&#xff0c;想要成功学好嵌入式开发有什么建议吗&#xff1f;想要学好的话选择一门合适的编程语言是关键。虽然嵌入式开发支持多种语言&#xff0c;但C和C仍然是最常用的。如果你是初学者&#xff0c;从学习C语言开始是一个不错的选择。它相对容易学习&#xff0c;而且…

MCU芯片测试:性能指标测试痛点是什么?ATECLOUD能否解决?

MCU芯片测试指标的核心是性能指标&#xff0c;包括处理器性能、存储器容量和读写速度&#xff0c;外设性能等。芯片测试对自动化测试的要求很高&#xff0c;ATECLOUD-IC不仅解决了传统测试方法的问题&#xff0c;而且也可以满足芯片测试的高要求&#xff0c;高效地完成MCU芯片性…

Vue组合式API

文章目录 Vue组合式API1. 概念1.1 传统组件1.2 组合式API 2. setup 组件 Vue组合式API 1. 概念 Vue3 组合式 API&#xff08;Composition API&#xff09; 主要用于在大型组件中提高代码逻辑的可复用性。 传统的组件随着业务复杂度越来越高&#xff0c;代码量会不断的加大&am…

Ubuntu18中NVIDIA,cuda,cudnn,pytorch安装

注意&#xff1a;nvidia驱动和cuda,cudnn,pytroch,python的对应关系 linux安装pytorch&#xff08;包括cuda与cudnn&#xff09;_linux清华园按照pytorch1.12_BryceRui的博客-CSDN博客 安装流程&#xff1a;安装cuda&#xff08;包括nvidia驱动&#xff09; cudnn python安装…

软件设计模式系列之三———工厂方法模式

1 模式的定义 工厂方法模式是一种常见的设计模式&#xff0c;属于创建型设计模式之一&#xff0c;它在软件工程中用于对象的创建。该模式的主要思想是将对象的创建过程抽象化&#xff0c;将具体对象的实例化延迟到子类中完成&#xff0c;以便在不同情况下可以创建不同类型的对…

yolov5训练ExDark数据集(附全过程代码,超详细教程,无坑!)

1.数据集获取 ExDark&#xff1a;免费下载地址 数据内容展示&#xff1a;共12个类别7363张 2.ExDark转yolo格式 ExDark的label文件内容 yolo的label文件内容 转换前准备如下 其中&#xff1a; anndir等于ExDark数据集中的Annotations文件夹&#xff0c;用于存放标签数据…

【LeetCode75】第五十四题 咒语和药水的成功对数

目录 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 代码&#xff1a; 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 题目给我们两个数组&#xff0c;要我们找出第一个数组中每个元素能和另一个数组的元素匹配的数量。匹配的条件是乘积大于特定的值。 那么…

论文阅读《Robust Monocular Depth Estimation under Challenging Conditions》

论文地址&#xff1a;https://arxiv.org/pdf/2308.09711.pdf 源码地址&#xff1a;https://github.com/md4all/md4all 概述 现有SOTA的单目估计方法在理想的环境下能得到满意的结果&#xff0c;而在一些极端光照与天气的情况下往往会失效。针对模型在极端条件下的表现不佳问题&…

【软件测试】selenium3

自动化测试的概念 自动化测试指软件测试的自动化&#xff0c;在预设状态下运行应用程序或者系统&#xff0c;预设条件包括正常和异常&#xff0c;最 后评估运行结果。将人为驱动的测试行为转化为机器执行的过程。 自动化测试就相当于将人工测试手段进行转换&#xff0c;让代码…

flex布局语法以及实操,一文带你吃透flex布局的基础

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 目录 前言 一、Flex是什么&#xff1f; 二、Flex语法知识 1.轴的使用 1.1flex-direction属性 ​编辑 2.基础知识 2.1justify-content属性 2.1.1justify-content: fl…

【HTML5高级第二篇】WebWorker多线程、EventSource事件推送、History历史操作

文章目录 一、多线程1.1 概述1.2 体会多线程1.3 多线程中数据传递和接收 二、事件推送2.1 概述2.2 onmessage 事件 三、history 一、多线程 1.1 概述 前端JS默认按照单线程去执行&#xff0c;一段时间内只能执行一件事情。举个栗子&#xff1a;比方说古代攻城游戏&#xff0c…

CRM软件系统排名靠前的相关推荐

CRM软件是企业管理客户关系的重要工具&#xff0c;它可以帮助企业提高销售效率、增强客户满意度、提升市场竞争力。在众多的CRM软件中&#xff0c;排名靠前的CRM软件有哪些&#xff1f;推荐您一款领先的CRM软件——Zoho CRM。 Zoho CRM是一款全球知名的CRM软件&#xff0c;累计…