RabbitMQ常见生产问题详解

目录

RabbitMQ如何保证消息不丢失?

哪些环节会有丢消息的可能?

RabbitMQ消息零丢失方案

1. 生产者保证消息正确发送到RibbitMQ

2. RabbitMQ消息存盘不丢消息

3. RabbitMQ 主从消息同步时不丢消息

4. RabbitMQ消费者不丢失消息

如何保证消息幂等?

如何保证消息的顺序?

关于RabbitMQ的数据堆积问题


RabbitMQ如何保证消息不丢失?

哪些环节会有丢消息的可能?

              

      其中,1,2,4三个场景都是跨网络的,而跨网络就肯定会有丢消息的可能。关于3这个环节,通常MQ存盘时都会先写入操作系统的缓存page cache中,然后再由操作系统异步的将消息写入硬盘。这个中间有个时间差,就可能会造成消息丢失。如果服务挂了,缓存中还没有来得及写入硬盘的消息就会丢失。这也是任何用户态的应用程序无法避免的。


RabbitMQ消息零丢失方案

1. 生产者保证消息正确发送到RibbitMQ

       对于单个数据,可以使用生产者确认机制。通过多次确认的方式,保证生产者的消息能够正确的发送到RabbitMQ中。

       RabbitMQ的生产者确认机制分为同步确认和异步确认。同步确认主要是通过在生产者端使用Channel.waitForConfirmsOrDie()指定一个等待确认的完成时间。异步确认机制则是通过channel.addConfirmListener(ConfirmCallback var1, ConfirmCallback var2)在生产者端注入两个回调确认函数。第一个函数是在生产者消息发送成功时调用,第二个函数则是生产者消息发送失败时调用。两个函数需要通过sequenceNumber自行完成消息的前后对应。sequenceNumber的生成方式需要通过channel的序列获取。int sequenceNumber = channel.getNextPublishSeqNo();之前文章中做过介绍。

        当前版本的RabbitMQ,可以在Producer中添加一个ReturnListener,监听那些成功发到Exchange,但是却没有路由到Queue的消息。如果不想将这些消息返回给Producer,就可以在Exchange中,也可以声明一个alternate-exchange参数,将这些无法正常路由的消息转发到指定的备份Exchange上。  

​        如果发送批量消息,在RabbitMQ中,另外还有一种手动事务的方式,可以保证消息正确发送。手动事务机制主要有几个关键的方法: channel.txSelect() 开启事务; channel.txCommit() 提交事务; channel.txRollback() 回滚事务; 用这几个方法来进行事务管理。但是这种方式需要手动控制事务逻辑,并且手动事务会对channel产生阻塞,造成吞吐量下降。

2. RabbitMQ消息存盘不丢消息

       对于Classic经典队列,直接将队列声明成为持久化队列即可。而新增的Quorum队列和Stream队列,都是明显的持久化队列,能更好的保证服务端消息不会丢失。

3. RabbitMQ 主从消息同步时不丢消息

       RabbitMQ的集群架构。普通集群模式,消息是分散存储的,不会主动进行消息同步了,是有可能丢失消息的。镜像模式集群,数据会主动在集群各个节点当中同步,这时丢失消息的概率不会太高。另外,启用Federation联邦机制,给包含重要消息的队列建立一个远端备份,也可以降低消息丢失的概率。

4. RabbitMQ消费者不丢失消息

       RabbitMQ在消费消息时可以指定是自动应答,还是手动应答。如果是自动应答模式,消费者会在完成业务处理后自动进行应答,而如果消费者的业务逻辑抛出异常,RabbitMQ会将消息进行重试,这样是不会丢失消息的,但是有可能会造成消息一直重复消费。

将RabbitMQ的应答模式设定为手动应答可以提高消息消费的可靠性。

channel.basicConsume(queueName, false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body)throws IOException {long deliveryTag = envelope.getDeliveryTag();channel.basicAck(deliveryTag, false);}});
channel.basicConsume(queueName, true, myconsumer);

任何用户态的应用程序都无法保证绝对的数据安全,所以备份与恢复的方案都需要考虑。


如何保证消息幂等?

       当消费者消费消息处理业务逻辑时,如果抛出异常,或者不向RabbitMQ返回响应,默认情况下,RabbitMQ会无限次数的重复进行消息消费。处理幂等问题,可以设定RabbitMQ的重试次数。在SpringBoot集成RabbitMQ时,可以在配置文件中指定spring.rabbitmq.listener.simple.retry开头的一系列属性,来制定重试策略。​ 然后,需要在业务上处理幂等问题。

       处理幂等问题的关键是要给每个消息一个唯一的标识。在SpringBoot框架集成RabbitMQ后,可以给每个消息指定一个全局唯一的MessageID,在消费者端针对MessageID做幂等性判断。

//这里用的message要是org.springframework.amqp.core.Message
//发送者指定ID字段
Message message2 = MessageBuilder.withBody(message.getBytes()).setMessageId(UUID.randomUUID().toString()).build();rabbitTemplate.send(message2);
//消费者获取MessageID,自己做幂等性判断
@RabbitListener(queues = "fanout_email_queue")
public void process(Message message) throws Exception {// 获取消息IdString messageId = message.getMessageProperties().getMessageId();...
}

       原生API当中,也是支持MessageId的。比如,针对订单消息,那就用订单ID来做唯一键。在RabbitMQ中,消息的头部就是一个很好的携带数据的地方。

// ==== 发送消息时,携带sequenceNumber和orderNo
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.deliveryMode(MessageProperties.PERSISTENT_TEXT_PLAIN.getDeliveryMode());
builder.priority(MessageProperties.PERSISTENT_TEXT_PLAIN.getPriority());
//携带消息ID
builder.messageId(""+channel.getNextPublishSeqNo());
Map<String, Object> headers = new HashMap<>();
//携带订单号
headers.put("order", "123");
builder.headers(headers);
channel.basicPublish("", QUEUE_NAME, builder.build(), message.getBytes("UTF-8"));// ==== 接收消息时,拿到sequenceNumber
Consumer myconsumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body)throws IOException {//获取消息IDSystem.out.println("messageId:"+properties.getMessageId());//获取订单IDproperties.getHeaders().forEach((key,value)-> System.out.println("key: "+key +"; value: "+value));// (process the message components here ...)//消息处理完后,进行答复。答复过的消息,服务器就不会再次转发。//没有答复过的消息,服务器会一直不停转发。channel.basicAck(deliveryTag, false);}};
channel.basicConsume(QUEUE_NAME, false, myconsumer);

如何保证消息的顺序?

​       某些场景下,需要保证消息的消费顺序,例如一个下单过程,需要先完成扣款,然后扣减库存,然后通知快递发货,这个顺序不能乱。如果每个步骤都通过消息进行异步通知的话,这一组消息就必须保证他们的消费顺序是一致的。

       在RabbitMQ当中,针对消息顺序的设计其实是比较弱的。唯一比较好的策略就是 单队列+单消息推送。即一组有序消息,只发到一个队列中,利用队列的FIFO特性保证消息在队列内顺序不会乱。显然这是以极度消耗性能作为代价的,在实际适应过程中,应该尽量避免这种场景。然后在消费者进行消费时,保证只有一个消费者,同时指定prefetch属性为1,即每次RabbitMQ都只往客户端推送一个消息。

spring.rabbitmq.listener.simple.prefetch=1

       在多队列情况下,如何保证消息的顺序性,目前使用RabbitMQ的话,还没有比较好的解决方案。在使用时,应该尽量避免这种情况。


关于RabbitMQ的数据堆积问题

        RabbitMQ一直以来都有一个缺点,就是对于消息堆积问题的处理不好。当RabbitMQ中有大量消息堆积时,整体性能会严重下降。而目前新推出的Quorum队列以及Stream队列,目的就在于解决这个核心问题。但是这两种队列的稳定性和周边生态都还不够完善,因此,在使用RabbitMQ时,还是要非常注意消息堆积的问题。尽量让消息的消费速度和生产速度保持一致。

       如果确实出现了消息堆积比较严重的场景,就需要从数据流转的各个环节综合考虑,设计适合的解决方案。

消息生产者端

​         最明显的方式自然是降低消息生产的速度。但是,生产者端产生消息的速度通常是跟业务息息相关的,一般情况下不太好直接优化。可以选择尽量多采用批量消息的方式,降低IO频率。

RabbitMQ服务端

       RabbitMQ本身其实也在着力于提高服务端的消息堆积能力。对于消息堆积严重的队列,可以预先添加懒加载机制,或者创建Sharding分片队列,这些措施都有助于优化服务端的消息堆积能力。

消息消费者端

       最直接的方式,就是增加消费者数量。尤其当消费端的服务出现问题,已经有大量消息堆积时。这时,可以尽量多的申请机器,部署消费端应用,争取在最短的时间内消费掉积压的消息。

 对于单个消费者端,可以通过配置提升消费者端的吞吐量。例如

# 单次推送消息数量
spring.rabbitmq.listener.simple.prefetch=1
# 消费者的消费线程数量
spring.rabbitmq.listener.simple.concurrency=5

       灵活配置相关参数,能够在一定程度上调整每个消费者实例的吞吐量,减少消息堆积数量。当遇到紧急状况,来不及调整消费者端时,可以紧急上线一个消费者组,专门用来将消息快速转录。保存到数据库或者Redis,然后再慢慢进行处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/439148.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

kubernetes-快速部署一套k8s集群

1、前置知识点 1.1 生产环境可部署Kubernetes集群的两种方式 目前生产部署Kubernetes集群主要有两种方式&#xff1a; kubeadm Kubeadm是一个K8s部署工具&#xff0c;提供kubeadm init和kubeadm join&#xff0c;用于快速部署Kubernetes集群。 二进制包 从github下载发行…

高端车规MCU的破局之路

目录 1 低质量的无效内卷 2 高端车规MCU产品共性 2.1 支持标定测量 2.2 低延迟通信加速 2.3 完备的网络安全解决方案 2.4虚拟化 3 国产替代的囚徒困境 1 低质量的无效内卷 近几年&#xff0c;车规MCU国产替代的呼声此消彼长&#xff0c;但仍然集中在低端产品。 从产…

静态分析Golang语言生成函数调用关系的利器——go-callvis

目录 升级go删除旧版本安装新版本配置环境变量载入环境修改当前环境修改之后进入的环境 分析安装go-callvis分析其他包总结 导出文件总结 清晰主体脉络总结 其他 参考资料 不同于之前分析C语言项目的工具&#xff0c;go-callvis还是很方便使用。只要把两项工作做好就能顺利的使…

蓝桥杯AT24C02问题记录

问题1&#xff1a;从这个图片上可以看出这两个在IIC的.c文件里延时时间不一样&#xff0c;第一张图使用了15个_nop_(); 12M晶振机器周期是 1/12M*121uS&#xff1b;nop()要延时1个指令周期。延时时间不对会对时序产生影响&#xff0c;时序不对&#xff0c;则AT24C02有没被使用…

C语言KR圣经笔记 5.12 复杂声明

5.12 复杂声明 C 语言有时会因为声明的语法而受到谴责&#xff0c;特别是涉及函数指针的声明语法。语法试图使声明和使用一致&#xff1b;在简单的情况下它的效果不错&#xff0c;但在更复杂的情况下会让人困惑&#xff0c;因为声明不能从左往右读&#xff0c;而且括号被过度使…

【Linux网络编程】网络编程套接字(1)

【Linux网络编程】网络编程套接字(1) 目录 【Linux网络编程】网络编程套接字(1)源IP地址和目的IP地址端口号端口号和进程ID的关系 网络通信TCP协议UDP协议网络字节序socket编程接口简单的UDP网络程序 作者&#xff1a;爱写代码的刚子 时间&#xff1a;2024.1.29 前言&#xff1…

Go语言中HTTP代理的请求和响应过程

在Go语言中&#xff0c;HTTP代理的实现涉及对请求和响应的拦截、转发和处理。下面将详细介绍这个过程。 请求过程&#xff1a; 客户端发起请求&#xff1a;客户端&#xff08;例如浏览器或其他应用程序&#xff09;发送HTTP请求到代理服务器。建立连接&#xff1a;代理服务器…

Git怎样用?(下载到本地,和在本地初始化)

全局设置&#xff1a; 点击第二个 输入&#xff1a; 例如&#xff1b;邮箱是随意地 git config --global user.name "名字" git config --global user.email "邮箱" 获取git仓库 本地初始化&#xff1a; 创建仓库 右键第二个 输入 git init 克隆&#…

nssctf round17

level1 基础共模攻击 # #真签到题 # from Crypto.Util.number import bytes_to_long, getPrime # from secret import getflag # # e1 getPrime(1024) # e2 getPrime(1024) # n e1 * e2 # m bytes_to_long(getflag().encode()) # c1 pow(m, e1, n) # c2 pow(m, e2, n) …

血细胞分类项目

血细胞分类项目 数据集&#xff1a;血细胞分类数据集数据处理 dataset.py网络 net.py训练 train.py拿训练集的几张图进行预测 数据集&#xff1a;血细胞分类数据集 https://aistudio.baidu.com/datasetdetail/10278 数据处理 dataset.py from torchvision import transfor…

Codeforces Round 799 (Div. 4)

目录 A. Marathon B. All Distinct C. Where’s the Bishop? D. The Clock E. Binary Deque F. 3SUM G. 2^Sort H. Gambling A. Marathon 直接模拟 void solve() {int ans0;for(int i1;i<4;i) {cin>>a[i];if(i>1&&a[i]>a[1]) ans;}cout<&l…

webassembly003 TTS BARK.CPP

TTS task TTS&#xff08;Text-to-Speech&#xff09;任务是一种自然语言处理&#xff08;NLP&#xff09;任务&#xff0c;其中模型的目标是将输入的文本转换为声音&#xff0c;实现自动语音合成。具体来说&#xff0c;模型需要理解输入的文本并生成对应的语音输出&#xff0…