RabbitMQ系列(19)--实现在RabbitMQ宕机的情况下对消息进行处理

前言:在生产环境中由于一些不明原因,导致RabbitMQ重启的情况下,在RabbitMQ重启期间生产者投递消息失败,生产者发送的消息会丢失,那这时候就需要去想在极端的情况下,RabbitMQ集群不可用的时候,如果去处理投递失败的消息。

1、在config包里新建一个名为ConfirmConfig的类用于编写配置交换机、队列、routingkey的代码

代码如下:

package com.ken.springbootrqbbitmq.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ConfirmConfig {//交换机public static final String EXCHANGE_NAME = "confirm_exchange";//队列public static final String QUEUE_NAME = "confirm_queue";//routingkeypublic static final String ROUTING_KEY = "confirm";//声明交换机@Bean("confirmExchange")public DirectExchange confirmExchange() {return new DirectExchange(EXCHANGE_NAME);}//声明队列@Bean("confirmQueue")public Queue confirmQueue() {return QueueBuilder.durable(QUEUE_NAME).build();}//绑定交换机和队列@Beanpublic Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue,@Qualifier("confirmExchange") DirectExchange confirmExchange) {return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(ROUTING_KEY);}}

2、在controller包里新建一个名为ProducerController的类用于编写充当生产者发送消息的代码

代码如下:

package com.ken.springbootrqbbitmq.controller;import com.ken.springbootrqbbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {@Autowired(required = false)private RabbitTemplate rabbitTemplate;//发消息@GetMapping("/sendMessage/{message}")public void sendMessage(@PathVariable String message) {rabbitTemplate.convertAndSend(ConfirmConfig.EXCHANGE_NAME,ConfirmConfig.ROUTING_KEY,message);log.info("发送消息内容:{}",message);}}

3、在consumer包里新建一个名为Consumer的类用于编写充当消费者消费消息的代码

代码如下:

package com.ken.springbootrqbbitmq.consumer;import com.ken.springbootrqbbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class Consumer {@RabbitListener(queues = ConfirmConfig.QUEUE_NAME)public void receiveConfirmMessage(Message message) {String msg = new String(message.getBody());log.info("接收到队列的消息为:{}",msg);}}

4、启动项目,在浏览器地址栏调用发送消息的接口,查看生产者是否运行成功并能发送消息http://localhost:8080/confirm/sendMessage/我是消息

例:

效果图: 

5、前言里我们说过,怎么在RabbitMQ宕机的情况下,保证生产者发送的消息不丢失呢,这时候就需要用到回调函数了,交换机本身收到消息后会确认消息,如果交换机没有确认或者确认消息失败,都视为发送消息失败,然后触发回调接口,告诉生产者消息发送失败,这样,消息接收成功与否我们都能通过回调方法返回的消息知道了

(1)在config包里新建一个名为MyCallBack的类用于编写交换机的确认回调方法

代码如下:

package com.ken.springbootrqbbitmq.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback {@Autowired(required = false)private RabbitTemplate rabbitTemplate;/*** @PostConstruct注解,在对象加载完依赖注入后执行,它通常都是一些初始化的操作,但初始化可能依赖于注入的其他组件,所以要等依赖全部加载完再执行*/@PostConstructpublic void init() {//把当前实现类MyCallBack注入到RabbitTemplate类的ConfirmCallback接口里面rabbitTemplate.setConfirmCallback(this);}/*** 交换机确认回调方法* 1、第一个参数:correlationData保存回调消息的ID以及相关信息* 2、第二个参数:交换机收到消息就返回true,否则返回false* 3、第三参数:原因(返回失败的原因,如果成功返回的是null)*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String id =  correlationData != null ? correlationData.getId() : "";if(ack) {log.info("交换机已经收到id为{}的消息",id);}else {log.info("交换机还未收到id为{}的消息,原因为{}",id,cause);}}
}

6、在上述步骤可得知confirm方法有一个类型为CorrelationData的参数correlationData,这个参数实际上是空的,并没有值,需要生产者发送,correlationData参数才会有值(connfirm方法的其余两个参数ack和cause默认有值)所以我们需要修改生产者的代码

 代码如下:

package com.ken.springbootrqbbitmq.controller;import com.ken.springbootrqbbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {@Autowired(required = false)private RabbitTemplate rabbitTemplate;//发消息@GetMapping("/sendMessage/{message}")public void sendMessage(@PathVariable String message) {CorrelationData correlationData = new CorrelationData("1");rabbitTemplate.convertAndSend(ConfirmConfig.EXCHANGE_NAME,ConfirmConfig.ROUTING_KEY,message,correlationData);log.info("发送消息内容:{}",message);}}

7、在配置文件加上以下配置开启交换机确认发布模式

spring.rabbitmq.publisher-confirm-type=correlated

配置文件完整内容如下:

spring.rabbitmq.host=192.168.194.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
#none(禁用发布确认模式,默认值)
#correlated(发布消息成功到交换机后会触发回调方法)
#simple(和correlated一样会触发回调方法,消息发布成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法,等待broker节点返回发送结果)
spring.rabbitmq.publisher-confirm-type=correlated

效果图:

8、启动项目,在浏览器地址栏调用发送消息的接口,可以看到生产者发送消息成功,交换机调用了回调接口,消费者成功消费消息

http://localhost:8080/confirm/sendMessage/我是消息

例:

效果图: 

9、把生产者要发送到的交换机改成不存在的,用以模拟交换机出问题的情景

package com.ken.springbootrqbbitmq.controller;import com.ken.springbootrqbbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {@Autowired(required = false)private RabbitTemplate rabbitTemplate;//发消息@GetMapping("/sendMessage/{message}")public void sendMessage(@PathVariable String message) {CorrelationData correlationData = new CorrelationData("1");rabbitTemplate.convertAndSend(ConfirmConfig.EXCHANGE_NAME + "1",ConfirmConfig.ROUTING_KEY,message,correlationData);log.info("发送消息内容:{}",message);}}

效果图:

10、重新启动项目,在浏览器地址栏调用发送消息的接口,可以看到生产者发送消息成功,交换机调用了回调接口并打印出了交换机接收消息失败的原因

http://localhost:8080/confirm/sendMessage/我是消息

例:

效果图: 

11、把RoutingKey改成不存在的,用以模拟队列出问题的情景

package com.ken.springbootrqbbitmq.controller;import com.ken.springbootrqbbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {@Autowired(required = false)private RabbitTemplate rabbitTemplate;//发消息@GetMapping("/sendMessage/{message}")public void sendMessage(@PathVariable String message) {CorrelationData correlationData = new CorrelationData("2");rabbitTemplate.convertAndSend(ConfirmConfig.EXCHANGE_NAME,ConfirmConfig.ROUTING_KEY + "2",message,correlationData);log.info("发送消息内容:{}",message);}}

效果图:

12、重新启动项目,在浏览器地址栏调用发送消息的接口,可以看到生产者发送消息成功,交换机调用了回调接口并打印出交换机接收消息成功,但消费者没有消费成功的日志输出,因为RoutingKey错了,交换机没有把消息发送到队列里,队列里没消息,自然消费者也就没有消费到消息了,但这个结果不符合我们的预期,因为这次丢失了消息,丢失消息却没有回馈消息丢失,实际上应该调用回调接口反馈消息丢失,所以我们需要继续往下改进代码。

http://localhost:8080/confirm/sendMessage/我是消息

例:

效果图: 

13、给配置文件加上以下配置,用以回退消息

spring.rabbitmq.publisher-returns=true

配置文件完整内容如下:

spring.rabbitmq.host=192.168.194.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
#none(禁用发布确认模式,默认值)
#correlated(发布消息成功到交换机后会触发回调方法)
#simple(和correlated一样会触发回调方法,消息发布成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法,等待broker节点返回发送结果)
spring.rabbitmq.publisher-confirm-type=correlated
#一旦投递消息失败或者路由失败,是否回退消息给生产者
spring.rabbitmq.publisher-returns=true

14、使用RabbitTemplate的内置接口回退消息

代码如下:

package com.ken.springbootrqbbitmq.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {@Autowired(required = false)private RabbitTemplate rabbitTemplate;/*** @PostConstruct注解,在对象加载完依赖注入后执行,它通常都是一些初始化的操作,但初始化可能依赖于注入的其他组件,所以要等依赖全部加载完再执行*/@PostConstructpublic void init() {//把当前实现类MyCallBack注入到RabbitTemplate类的ConfirmCallback接口里面rabbitTemplate.setConfirmCallback(this);//把当前实现类MyCallBack注入到RabbitTemplate类的ReturnCallback接口里面rabbitTemplate.setReturnCallback(this);}/*** 交换机确认回调方法* 1、第一个参数:correlationData保存回调消息的ID以及相关信息* 2、第二个参数:交换机收到消息就返回true,否则返回false* 3、第三参数:原因(返回失败的原因,如果成功返回的是null)*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String id =  correlationData != null ? correlationData.getId() : "";if(ack) {log.info("交换机已经收到id为{}的消息",id);}else {log.info("交换机还未收到id为{}的消息,原因为{}",id,cause);}}/*** 可以在当消息传递过程中不可达目的地时将消息返回给生产者* @param message* @param replyCode* @param replyText* @param exchange* @param routingKey*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.error("消息{},被交换机{}退回,退回原因:{},路由routingkey:{}",new String(message.getBody()),exchange,replyText,routingKey);}}

15、重新启动项目,在浏览器地址栏调用发送消息的接口,可以看到生产者发送消息成功,交换机收到消息发不过去队列后把消息回退了,保证了消息不丢失。

http://localhost:8080/confirm/sendMessage/我是消息

例:

效果图:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/18125.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

第44节:cesium 大雾效果(cesium自带)(含源码+视频)

结果示例: 完整源码: <template><div class="viewer"><vc-viewer @ready="ready" :logo="false"><!

Android 视频直播提拉流 嵌入式硬件 流媒体开发详细内容

1 Linux 系统编程网络编程基础 2 Linux 网络编程流媒体服务器&#xff0c;客户端开发实践 3 Android流媒体客户端 FFmpeg OpenGL ES 开发实践 4 Android H.264 AAC 封装mp4开发实战 5 流媒体开发实战之Rtmp推流 6 流媒体开发实战之RTSP推流 7 流媒体开发实战之UDP 8 P2P点对点项…

路径规划算法:基于晶体结构优化的路径规划算法- 附代码

路径规划算法&#xff1a;基于晶体结构优化的路径规划算法- 附代码 文章目录 路径规划算法&#xff1a;基于晶体结构优化的路径规划算法- 附代码1.算法原理1.1 环境设定1.2 约束条件1.3 适应度函数 2.算法结果3.MATLAB代码4.参考文献 摘要&#xff1a;本文主要介绍利用智能优化…

Python3+requests搭建接口自动化测试框架

目录 一、接口自动化的意义&#xff08;为什么做这个框架&#xff09; 二、准备工作 三、框架流程及逻辑 四、各模块介绍 五、具体使用 一、接口自动化的意义&#xff08;为什么做这个框架&#xff09; 新版本上线时之前版本的功能需要进行回归测试&#xff0c;导致大量的…

布雷默浪丹 PT 141:189691-06-3,1607799-13-2,Bremelanotide,布美诺肽

Bremelanotide&#xff0c;布雷默浪丹 PT 141&#xff0c;布美诺肽Product structure&#xff1a; Product specifications&#xff1a; 1.CAS No&#xff1a;189691-06-3/1607799-13-2 2.Molecular formula&#xff1a;C50H68N14O10 3.Molecular weight&#xff1a;1025.063 4…

gigachad1靶机详解

gigachad_vh靶机详解 扫描到ip后对ip做一个全面扫描&#xff0c;发现有一个匿名服务器&#xff0c;是可以免密登陆的。 登陆上后发现就一个文件&#xff0c;get到我们电脑上。 file一下发现是一个zip文件&#xff0c;unzip解压一下&#xff0c;发现给了一个用户名chad&#xf…

C++多线程学习(十七、简单实现线程池)

目录 线程池 设计线程池的关键问题 代码 可能出现的疑问 queue> task; 总结&#xff1a; template auto InsertQueue(T&& t, Args&& ...args)->future;(t(args...))> 总结&#xff1a; ThreadPool(size_t size);构造函数 总结&#xff1…

Vue3 CSS v-bind 计算和三元运算

官方文档 中指出&#xff1a;CSS 中的 v-bind 支持 JavaScript 表达式&#xff0c;但需要用引号包裹起来&#xff1a; 例子如下&#xff1a; <script lang"ts" setup> const treeContentWidth ref(140); </script><style lang"less" scop…

springcloud eureka增加安全认证

网上很多资料写的不全&#xff0c;不细致。 springcloud架构&#xff0c;本地运行代码是eureka地址一般为localhost:port&#xff08;自己暴露的端口&#xff09;&#xff0c;例如http://localhost:9000/ &#xff0c;但是如果在服务器&#xff0c;且使用k8s部署&#xff0c;一…

MFC List Control控件添加单元格编辑和单元格下拉列表

文章目录 初始化编译栏的创建与销毁下拉列表的创建与销毁自定义编译栏与下拉框的焦点问题点击CListCtrl 事件处理程序双击添加全部代码 ui 设置 初始化 #define IDC_EDIT_CREATEID 3000 #define IDC_COMBOX_CREATEID 3001int e_Item0; //刚编辑的行 int e_SubItem2; //刚…

JavaWeb 速通JavaScript

目录 一、JavaScript快速入门 1.基本介绍 : 2.JavaScript特点 : 3.JavaScript使用方式 : 1 方式一 : 写在 2 方式二 : 以外部文件形式引入 PS : 注意事项 4.JavaScript查错方式 : 二、JavaScript数据类型 1.变量 : 2.数据类型 : 3.特殊值 : 三、JavaScript运算符 1.算…

对文件中的数据进行排序

#include<stdio.h> #include<stdlib.h> #include<time.h> int main()//生成1000个随机数据 {srand((size_t)time(NULL));FILE* fpfopen("d:/data.txt","w");if(!fp) return -1;for(int i0;i<1000;i)fprintf(fp,"%d\n",ran…