Rabbitmq怎么保证消息的可靠性?

一、消费端消息可靠性保证

  1. 消息确认(Acknowledgements)

消费者在接收到消息后,默认情况下RabbitMQ会自动确认消息(autoAck=true)。为保证消息可靠性,可以设置autoAck=false,使得消费者在处理完消息后手动发送确认(basicAck)。如果消费者在处理过程中发生异常或者未完成处理就终止运行,那么消息在超时时间内将不会被删除,会再次被RabbitMQ投递给其他消费者。

      2.死信队列(Dead Letter Queue):

当消息不能被正常消费时(比如达到最大重试次数),可以通过设置TTL(Time To Live)或者死信交换器(Dead Letter Exchange)将消息路由至死信队列,从而有机会后续分析和处理这些无法正常消费的消息。

二、生产端消息可靠性保证:

  1. 消息持久化

当生产者发布消息时,可以选择将其标记为持久化(persistent).这意味着即使 RabbitMQ 服务器重启,消息也不会丢失,因为它们会被存储在磁盘上。

       2.确认(Confirm)机制

开启confirm回调模式后,RabbitMQ会在消息成功写入到磁盘并至少被一个交换器接受后,向生产者发送一个确认(acknowledgement)。若消息丢失或无法投递给任何队列,RabbitMQ将会发送一个否定确认(nack). 生产者可以根据这些确认信号判断消息是否成功送达并采取相应的重试策略。

RabbitMQ作为消息中间件并启用publisher confirms(发布者确认)与publisher returns(发布者退回)机制时,可以确保消息从生产者到交换机的投递过程得到更准确的状态反馈。



1.@PostConstruct注解

@PostConstruct注解是Java EE规范中的一部分,主要用于标记在一个Bean初始化完成后需要执行的方法。这个注解由JSR-250定义,并且在Spring框架以及其他遵循Java EE标准的应用服务器中广泛支持。

功能与用途:初始化方法,当容器完成对Bean的实例化并且所有依赖注入完成后,将会自动调用标有@PostConstruct注解的方法。这为开发者提供了一个机会,在对象正式投入使用之前进行一些必要的初始化工作,比如初始化资源、预计算某些值、启动后台任务等增强。

2. Publisher Confirms(发布者确认)

作用: Publisher Confirm机制允许RabbitMQ服务器通知生产者一个消息是否已经被交换机正确接收。当publisher-confirm-type设置为CORRELATED时,RabbitMQ会向生产者发送确认或否定响应,确认消息已到达交换机,但不保证消息已被路由到至少一个队列中。

生产者到交换机的确认(消息到达交换机)

2.1.配置:

spring.rabbitmq.publisher-confirm-type = CORRELATED

2.2. 代码实现

只要到达交换机就会触发

rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {// 消息成功投递成功并被确认} else {// 消息未能正确投递}}
});

3.Publisher Returns(发布者退回)

作用: Publisher Return机制用于当消息无法按照路由键规则路由到任何队列时,或者由于其他原因(例如队列满、消息过大等)而被交换机拒绝时,RabbitMQ将消息返回给生产者。

交换机到队列的确认(消息是否正常发送到了队列)

通过实现 ReturnCallback 接口,发送消息失败返回,比如交换机路由不到队列时触发回调:

1.只有消息没有路由到队列的时候,才触发该回调 .

2.只要有一个队列接受到消息了,它就认为成功.

3.1 配置

spring.rabbitmq.publisher-returns = true

3.2 代码实现

rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {// 处理未被路由或因某种原因被退回的消息}
});

4.完整代码

4.1消费者

/** Copyright (c) 2020, 2024,  All rights reserved.**/
package com.by.consumer;import cn.hutool.core.map.MapUtil;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** <p>Project: spring-boot-rabbitMQ - DirectConsumer</p>* <p>Powered by scl On 2024-04-07 16:57:20</p>* <p>描述:<p>** @author 孙臣龙 [1846080280@qq.com]* @version 1.0* @since 17*/
@Configuration
public class ReliabilityConsumer2 {//注册队列@Beanpublic Queue queue1() {return QueueBuilder.durable("Re_Q01").deadLetterExchange("dead_E01").deadLetterRoutingKey("DK01").build();}//注册交换机@Beanpublic CustomExchange exchange() {Map<String, Object> map = MapUtil.of("x-delayed-type", "direct");return new CustomExchange("Re_E01", "x-delayed-message", true, false, map);}//绑定交换机和队列@Beanpublic Binding binding2() {return BindingBuilder.bind(queue1()).to(exchange()).with("RK01").noargs();}//注册一个死信交换机@Beanpublic DirectExchange deadExchange() {return new DirectExchange("dead_E01");}//注册一个死信队列@Beanpublic Queue deadQueue() {return QueueBuilder.durable("dead_Q01").build();}//绑定死信交换机和死信队列@Beanpublic Binding deadBinding() {return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("DK01");}//启动一个消费者@RabbitListener(queues = "Re_Q01")public void receiveMessage(OrderKO msg) {System.out.println("消费者2:" + msg);}
}

 4.2生产者

/** Copyright (c) 2020, 2024,  All rights reserved.**/
package com.by.provider;import com.by.consumer.OrderKO;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import java.util.UUID;/*** <p>Project: spring-boot-rabbitMQ - DirectProvider</p>* <p>Powered by scl On 2024-04-07 17:06:41</p>* <p>描述:<p>** @author 孙臣龙 [1846080280@qq.com]* @version 1.0* @since 17*/
@Service
public class ReliabilityProvider implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init() {rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnCallback(this);}//启动一个生产者public void send(OrderKO orderKO) {CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());System.out.println("callbackSender UUID: " + correlationData.getId());rabbitTemplate.convertAndSend("Re_E01","RK01",orderKO,m-> m,correlationData);}@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {if (b) {System.out.println("消息发送成功");} else {System.out.println("消息发送失败");}}@Overridepublic void returnedMessage(Message message, int i, String s, String s1, String s2) {System.out.println("消息丢失");}
}

4.3配置文件

spring.rabbitmq.publisher-confirm-type = CORRELATED
spring.rabbitmq.publisher-returns = true

4.4测试

 @Testvoid test6() throws InterruptedException, IOException {for (int i = 1; i <= 5; i++) {OrderKO orderKO = OrderKO.builder().id(i).name("孙臣龙" + i).build();System.out.println("发送消息"+i);reliabilityProvider.send(orderKO);}Thread.sleep(10000);//System.in.read();}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/603929.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

密码应用方案测评要点及测评过程

&#xff08;1&#xff09;背景 《GBT39786-2021 信息系统密码应用基本要求》中第1-4级密码应用基本要求均包括“应依据相关标准和密码应用需求&#xff0c;制定密码应用方案”。第1-4级密码应用基本要求对于“投入运行前进行密码应用安全性评估”的具体如下。 第一级&#xff…

单例模式(饿汉模型,懒汉模型)

在着里我们先了解什么是单例模式。 就是某个类在进程中只能有单个实例&#xff0c;这里的单例模式需要一定的编程技巧&#xff0c;做出限制&#xff0c;一旦程序写的有问题&#xff0c;创建了多个实例&#xff0c;编程就会报错。 如果我们学会了单例模式&#xff0c;这种模式…

Fabric入门【00】简介

一、Hyperledger Fabric 1.IBM公司开发的联盟链&#xff1a;必须经过授权认证才能加入&#xff08;优点&#xff1a;避免了PoW资源开销、提高交易处理速率。缺点&#xff1a;去中心化程度减低&#xff09; 二、主要组成部分 1.Ledger&#xff1a; &#xff08;1&#xff09;…

微机原理——绪论

本篇文章是我在观看网课时记录的笔记。如有错误欢迎批评指正。 微机原理————绪论 我们在使用计算机时&#xff0c;最重要最核心的就是计算机的CPU(中央处理器)&#xff0c;决定了计算机的计算速度&#xff0c;但是CPU无法直接读取外界的温度、湿度、压力之类的物理量&…

前端html+css+js常用总结快速入门

&#x1f525;博客主页&#xff1a; A_SHOWY&#x1f3a5;系列专栏&#xff1a;力扣刷题总结录 数据结构 云计算 数字图像处理 力扣每日一题_ 学习前端全套所有技术性价比低下且容易忘记&#xff0c;先入门学会所有基础的语法&#xff08;cssjsheml&#xff09;&#xff…

用代码验证,esm 导出的是值的引用,commonjs导出的是值的拷贝

首先需要学习一下 esm 和 commonjs 的区别&#xff0c;其中一条关于导出值我们可以手动验证一下&#xff0c;先记住结论 esm 导出的是值的引用commonjs导出的是值的拷贝 没错我又遇到这个问题了&#xff0c;面试官先问我 commonjs 和 esm 有啥区别&#xff1f; 然后问如果 com…

Docker仅需3步搭建免费私有化的AI搜索引擎-FreeAskInternet

简介 FreeAskInternet 是一个完全免费、私有且本地运行的搜索引擎&#xff0c;并使用 LLM 生成答案&#xff0c;无需 GPU。用户可以提出问题&#xff0c;系统会进行多引擎搜索&#xff0c;并将搜索结果合并到ChatGPT3.5 LLM中&#xff0c;并根据搜索结果生成答案。 什么是 Fr…

什么是多路复用器滤波器

本章将更深入地介绍多路复用器滤波器&#xff0c;以及它们如何用于各种应用中。您将了解到多路复用器如何帮助设计人员创造出更复杂的无线产品。 了解多路复用器 多路复用器是一组射频(RF)滤波器&#xff0c;它们组合在一起&#xff0c;但不会彼此加载&#xff0c;可以在输出之…

PostgreSQL入门到实战-第九弹

PostgreSQL入门到实战 PostgreSQL数据过滤(二)官网地址PostgreSQL概述PostgreSQL中and操作理论PostgreSQL中and操作实操更新计划 PostgreSQL数据过滤(二) 了解PostgreSQL AND逻辑运算符以及如何使用它来组合多个布尔表达式。 官网地址 声明: 由于操作系统, 版本更新等原因, …

LeetCode-322. 零钱兑换【广度优先搜索 数组 动态规划】

LeetCode-322. 零钱兑换【广度优先搜索 数组 动态规划】 题目描述&#xff1a;解题思路一&#xff1a;Python动态规划五部曲&#xff1a;定推初遍举【先遍历物品 后遍历背包】解题思路二&#xff1a;Python动态规划五部曲&#xff1a;定推初遍举【先遍历背包 后遍历物品】解题思…

查分约束学习

问题模型&#xff1a; 有n个变量&#xff1a;&#xff0c;有m个约束条件 令差分数组&#xff0c;可以知道如果x1x2<q&#xff0c;那么与j和i-1有关联 由画图可知&#xff0c;如果有在i-1至j建立的有向图中跑最短路&#xff0c;那么dis[n]即为最小的约束变量 另外&#x…

如何在ngc中找到跟物理机驱动版本匹配的docker镜像

如何在ngc中找到跟物理机驱动版本匹配的docker镜像 1.nvidia-smi查看CUDA版本2. [打开ngc官网](https://catalog.ngc.nvidia.com/orgs/nvidia/containers/pytorch/layers)3.直到找到CUDA版本对应的Tag【比如CUDA 12.1的tag是 23.07-py3】4.拉取镜像 [Tag&#xff1a;23.07-py3]…