文章目录
- 简述
- SpringBoot项目
- 引入依赖
- 配置文件
- 项目结构
- 实体类
- 配置类
- RabbitMQ交换机队列声明,绑定配置类
- 回调接口配置类
- Mapper接口
- UserMapper接口
- UserEsMapper
- Controller类
- Service接口
- Service实现类
- 监听类/消费者
简述
上一篇是同步调用,我们在中间加上MQ就可以实现异步调用,这种方式性能高,不易出现数据丢失问题,多源写入之间相互隔离,便于扩展更多的数据源写入。
同时也会带来一些问题,首先还是代码侵入强,其次系统复杂度会增加,因为引入了消息中间件
可能出现延时问题:MQ是异步消费模型,可能会造成延时。
这种方案也不是很推荐,简单了解学习一下就好。
下面通过SpringBoot项目演示一下,首先本地要有MQ,我这里使用RabbitMQ。若本地没有,可移步:Windows版Docker安装RabbitMQ
Linux的Docker也类似
对RabbitMQ还不是很了解的,可以打开我的主页查看RabbitMQ系列教程
这里只做最简单的MQ可靠性配置
SpringBoot项目
引入依赖
全部依赖如下
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-elasticsearch</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.3.1</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
配置文件
注意修改Mysql,ES,rabbitmq地址及信息
# 端口号8080
server:port: 8080# 数据库名:mysql,用户名root,密码123456
spring:datasource:username: rootpassword: 123456url: jdbc:mysql://mysql地址:3306/mysql?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghaidriver-class-name: com.mysql.cj.jdbc.Driverelasticsearch:rest:uris: ES地址:9200rabbitmq:host: rabbitmq地址port: 5672username: adminpassword: admin#确认消息已发送到交换机publisher-confirm-type: correlated#确认消息已发送到队列(Queue)publisher-returns: true# mybatis-plus配置
mybatis-plus:# xml文件位置mapper-locations: classpath:mapper/*.xml
项目结构
实体类
/*** mysql(user)与ES(user-demo)实体类*/
@Data
@TableName(value = "user_t")
@Document(indexName = "user-demo")
public class User {@Idprivate String id;private String userName;private String address;
}
配置类
RabbitMQ交换机队列声明,绑定配置类
/*** RabbitMQ交换机队列声明,绑定配置类*/
@Configuration
public class Config {//交换机名称public static final String X_EXCHANGE = "X";//队列名称public static final String QUEUE_INSERT = "A";public static final String QUEUE_DELETE = "B";public static final String QUEUE_UPDATE = "C";//声明交换机xExchange@Bean("xExchange")public DirectExchange xExchange() {return new DirectExchange(X_EXCHANGE);}//声明队列A@Bean("queueA")public Queue queueInsert() {return QueueBuilder.durable(QUEUE_INSERT).build();}//声明队列B@Bean("queueB")public Queue queueDelete() {return QueueBuilder.durable(QUEUE_DELETE).build();}//声明队列C@Bean("queueC")public Queue queueUpdate() {return QueueBuilder.durable(QUEUE_UPDATE).build();}//绑定交换机与队列//A与X通过XA线路绑定@Beanpublic Binding queueInsertBindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange) {return BindingBuilder.bind(queueA).to(xExchange).with("XA");}//B与X通过XB线路绑定@Beanpublic Binding queueDeleteBindingX(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange) {return BindingBuilder.bind(queueB).to(xExchange).with("XB");}//C与X通过XC线路绑定@Beanpublic Binding queueUpdateBindingX(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange") DirectExchange xExchange) {return BindingBuilder.bind(queueC).to(xExchange).with("XC");}
}
回调接口配置类
/*** 回调接口*/
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init() {//内部接口注入类中rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnCallback(this);}/*** 交换机确定回调方法* 1.发消息 交换机接收到消息 回调* 1.1 correlationData 保存回调消息的ID及相关信息* 1.2 交换机收到消息 ack=true* 1.3 cause null* 2.发消息 交换机接受失败 回调* 2.1 correlationData 保存回调消息的ID及相关信息* 2.2 交换机收到消息 ack=false* 2.3 cause 失败原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String id = correlationData != null ? correlationData.getId() : "";if (ack) {log.info("交换机回报消息:收到id为:{}的消息", id);} else {log.info("交换机回报消息:未经收到id为:{}的消息,原因为:{}", id, cause);}}/*** 队列失败回报* @param message 消息* @param i 返回码* @param s 返回信息* @param s1 交换机* @param s2 路由*/@Overridepublic void returnedMessage(Message message, int i, String s, String s1, String s2) {log.error("队列回报消息:消息被交换机:{}退回,路由key:{},退回原因:{}", s1,s2,s);}
}
Mapper接口
UserMapper接口
/*** mysql user实体Mapper接口*/
public interface UserMapper extends BaseMapper<User> {
}
UserEsMapper
/*** ES user-demo实体Mapper接口*/
@Repository
public interface UserEsMapper extends ElasticsearchRepository<User,String> {}
Controller类
此处Controller充当生产者,接到请求,先执行mysql操作,然后将消息按情况通过交换机转发到不同的队列,相应的消费者收到消息后对ES进行处理
/*** 异步调用方式实现mysql与ES数据同步Controller/消息生产者*/
@Slf4j
@RestController
@RequestMapping(value = "/asyn")
public class DataController {@Resourceprivate IDataService dataService;@Resourceprivate RabbitTemplate rabbitTemplate;/*** 同步更新mysql和ES的user信息* @param user user实体*/@GetMapping("/update")public void updateData(User user){dataService.updateMysqlData(user);String key = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(key);rabbitTemplate.convertAndSend("X","XA",user.getId(),correlationData);log.info("Producer消息:已发送消息:{}到队列A中等待ES更新处理,消息ID:{}",user.getId(),key);}/*** 查询user表信息* @return user信息集合*/@GetMapping("/findData")public List<User> findAllData(){return dataService.findAllData();}/*** 同步根据id删除mysql和ES中user对应的数据信息* @param id 需要删除的信息id*/@GetMapping("/delete")public void deleteDataById(String id){dataService.deleteDataById(id);String key = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(key);rabbitTemplate.convertAndSend("X","XB",id,correlationData);log.info("Producer消息:已发送消息:{}到队列B中等待ES删除处理,消息ID:{}",id,key);}/*** 同步新增mysql和ES的user数据* @param user user实体*/@GetMapping("addData")public void addData(User user){dataService.addData(user);String key = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(key);rabbitTemplate.convertAndSend("X","XA",user.getId(),correlationData);log.info("Producer消息:已发送消息:{} 到队列A中等待ES新增处理,消息ID:{}",user.getId(),key);}/*** 同步删除mysql和ES中所有user信息*/@GetMapping("deleteAll")public void deleteAllData(){dataService.deleteAllData();dataService.esDeleteAllData();}/*** 查询ES中所有user信息*/@GetMapping("findEs")public Iterable<User> findEs(){return dataService.findEs();}
}
Service接口
/*** 异步调用方式实现mysql与ES数据同步Service*/
public interface IDataService extends IService<User> {/*** 根据id更新mysql数据* @param user 需要更新数据的user对象*/void updateMysqlData(User user);/*** 查询所有数据* @return user对象集合*/List<User> findAllData();/*** mysql根据id删除信息* @param id 需要删除信息的id*/void deleteDataById(String id);/*** mysql新增数据* @param user 需要新增数据的对象*/void addData(User user);/*** ES根据ID删除数据* @param id 需要删除信息的id*/void esDeleteDataById(String id);/*** ES新增/根据ID修改数据* @param user 需要新增/根据ID修改数据的对象*/void esAddData (User user);/*** mysql删除user表所有数据*/void deleteAllData();/*** es删除index=user-demo中所有数据*/void esDeleteAllData();/*** 查询ES中所有数据信息*/Iterable<User> findEs();
}
Service实现类
/*** 异步调用方式实现mysql与ES数据同步Service实现类*/
@Service
public class DataServiceImpl extends ServiceImpl<UserMapper, User> implements IDataService {@Resourceprivate UserMapper userMapper;@Resourceprivate UserEsMapper userEsMapper;/*** 根据id更新mysql数据* @param user 需要更新数据的user对象*/@Overridepublic void updateMysqlData(User user) {userMapper.updateById(user);}/*** 查询所有数据* @return user对象集合*/@Overridepublic List<User> findAllData() {return userMapper.selectList(null);}/*** mysql根据id删除信息* @param id 需要删除信息的id*/@Overridepublic void deleteDataById(String id) {userMapper.deleteById(id);}/*** mysql新增数据* @param user 需要新增数据的对象*/@Overridepublic void addData(User user) {userMapper.insert(user);}/*** ES根据ID删除数据* @param id 需要删除信息的id*/@Overridepublic void esDeleteDataById(String id) {userEsMapper.deleteById(id);}/*** ES新增/根据ID修改数据* @param user 需要新增/根据ID修改数据的对象*/@Overridepublic void esAddData(User user) {userEsMapper.save(user);}/*** mysql删除user表所有数据*/@Overridepublic void deleteAllData() {userMapper.delete(null);}/*** es删除index=user-demo中所有数据*/@Overridepublic void esDeleteAllData() {userEsMapper.deleteAll();}/*** 查询ES中user所有信息* @return 查询user信息集合*/@Overridepublic Iterable<User> findEs() {return userEsMapper.findAll();}}
监听类/消费者
/*** 异步调用方式实现mysql与ES数据同步消息消费者*/
@Slf4j
@Component
public class Consumer {@Resourceprivate IDataService dataService;@Resourceprivate UserMapper userMapper;//接收消息@RabbitListener(queues="A")public void addData(Message message){log.info("Consumer消息:当前时间:{},收到A队列的消息:{},进行ES新增操作",new Date().toString(),new String(message.getBody()));QueryWrapper<User> queryWrapper = new QueryWrapper<>();queryWrapper.eq("id",new String(message.getBody()));User user = userMapper.selectOne(queryWrapper);dataService.esAddData(user);log.info("ES新增/更新数据为:{}",user);}@RabbitListener(queues = "B")public void delete(Message message){log.info("Consumer消息:当前时间:{},收到B队列的消息:{},进行ES删除操作",new Date().toString(),new String(message.getBody()));dataService.esDeleteDataById(new String(message.getBody()));}@RabbitListener(queues="C")public void update(Message message){}
}
操作完成