消息中间件RabbitMq如何保证消息的可靠性

关于保证消息的可靠性,可以从rabbitmq的组成部分来分析,第一部分发送方,第二部分服务端,第三部分消费方,第四部分兜底部分

1.生产者发送确认机制,当生产者发送消息到rabbitmq后,rabbitmq会给生产者一个确认,告诉生产者这个消息我收到且保存

java代码

channel.confirmSelect();  // 启用生产者确认

  

if (channel.waitForConfirms()) {System.out.println("Message successfully delivered!");
} else {System.out.println("Message delivery failed!");
}

  

import com.rabbitmq.client.*;public class PublisherConfirmProducer {private final static String QUEUE_NAME = "persistent_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {// 声明持久化队列channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 启用发布者确认模式channel.confirmSelect();String message = "Hello RabbitMQ with Publisher Confirms!";// 发布消息channel.basicPublish("", QUEUE_NAME, new AMQP.BasicProperties.Builder().deliveryMode(2).build(), message.getBytes());// 等待确认if (channel.waitForConfirms()) {System.out.println("Message successfully delivered!");} else {System.out.println("Message delivery failed!");}}}
}

2.服务端方面,消息持久化,也就是把消息保存到磁盘中,即rabbitmq重启后,队列依旧存在,未消费的消息依旧存在

java代码方面创建持久化队列,可以消息持久化

import com.rabbitmq.client.*;public class Producer {private final static String QUEUE_NAME = "persistent_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {// 声明一个持久化的队列channel.queueDeclare(QUEUE_NAME, true, false, false, null);String message = "Hello RabbitMQ!";// 发布持久化消息channel.basicPublish("", QUEUE_NAME, new AMQP.BasicProperties.Builder().deliveryMode(2).build(), // 设置消息持久化message.getBytes());System.out.println("Sent: " + message);}}
}

3.从消费方设计,消息消费后进行一个手动的确认,取代原有的自动确认

java代码方面

import com.rabbitmq.client.*;public class Consumer {private final static String QUEUE_NAME = "persistent_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {// 声明持久化队列(即便消费者已经运行,这里仍然要声明队列)channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 设置消费者回调,手动确认消息DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Received: " + message);// 发送确认,表示已成功处理该消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};// 开始消费消息,设置自动确认为 false,开启手动确认channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});}}
}

4.兜底方案,即死信队列,你可能会问"消费者未能消费的,不做确认不就好了,消息依旧在rabbitmq啊,为啥要使用死信队列呢",这里使用死信队列,是为了这些失败的消息避免被重复消息,因为不可控制的原因,消费失败的问题短时间会持续存在,使用死信队列为了给正常的

消息腾空间,同时使用死信队列,也是为了更好的分析失败原因,总体来说就是好坑位要留给需要的人

java代码使用死信队列

import com.rabbitmq.client.*;public class DeadLetterProducer {private final static String QUEUE_NAME = "main_queue";private final static String DLQ_NAME = "dead_letter_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {// 创建死信队列channel.queueDeclare(DLQ_NAME, true, false, false, null);// 创建主队列并设置死信交换机和路由键channel.queueDeclare(QUEUE_NAME, true, false, false, Map.of("x-dead-letter-exchange", "", "x-dead-letter-routing-key", DLQ_NAME));String message = "Message for Dead Letter Queue";// 发布消息channel.basicPublish("", QUEUE_NAME, new AMQP.BasicProperties.Builder().deliveryMode(2).build(), message.getBytes());System.out.println("Sent to main queue: " + message);}}
}

  以上从四个方面介绍保证rabbitmq消息可靠性措施

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/891339.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

厂房AI火焰识别网络摄像机

厂房AI火焰识别网络摄像机通过深度学习算法,能够识别火焰的细微特征,即使在复杂环境背景下也能准确判断,在设计上借助传感器过滤掉图像上像火的物体,比如车尾灯,晚霞。算法帮助传感器过滤掉带有辐射的物体,比如人体,汽车尾气,太阳光等。使得误报率几乎不会存在,大大降低…

乱扔垃圾行为检测系统

乱扔垃圾行为检测系统基于YOLOX+RNN的深度学习算法,乱扔垃圾行为检测系统通过前端摄像头一旦检测到乱扔垃圾行为,系统会立即发出警报,通知相关人员及时处理,从而起到保障社会卫生的作用。本系统通过安装在垃圾桶周围的摄像头,实时监测垃圾桶内的垃圾量。当垃圾桶内的垃圾达…

现代CPU调优3: CPU 微架构

3 CPU CPU 微架构 本章简要概述了对软件性能有直接影响的关键 CPU 微体系结构特性。本章的目的并不是要涵盖 CPU 架构的所有细节和权衡,文献[Hennessy & Patterson, 2017 Computer Architecture, Sixth Edition]、[Shen & Lipasti, 2013 Modern Processor Design: Fun…

山体落石滑坡识别系统 落石泥石流监控摄像机

山体落石滑坡识别系统 落石泥石流监控摄像机基于YOLOX+RNN的深度学习算法,山体落石滑坡识别系统 落石泥石流监控摄像机通过安装在山区公路沿线的监控摄像机来实现对山体的实时监测。这些摄像机分布在关键位置,如山体易滑坡区域、桥梁附近等,能够24小时不间断地捕捉山体的动态…

MySQL语句查询——子查询和三表查询

一、子查询 1、定义:一个查询中嵌套另一个查询 2、子查询的分类 (1)标量子查询 (2)列子查询 (3)行子查询 (4)表子查询(运用多) 3、子查询详解 (1)标量子查询(返回一个值) -把一个sql 执行返回的一个值,作为另一个sql的条件,得到的结果是一行一列,一般出现在…

leetcode hot 14

解题思路:这题思路有很多,动态规划,前缀和等,前缀和就是遍历一遍,将每个前缀和与前面最小的前缀和相减,就能得到最大值,然后比较与记录最大值。(还有一种思路就是首先明确最大子串内部一定不会存在某个边缘子串小于0,所以可以遍历一遍先记录继续记录前缀和,然后比较ma…

如何在React.js中使用Shadcn/UI

如何在React.js中使用Shadcn/UI 学习如何在React.js中使用Shadcn/UI构建可自定义且轻量的界面。了解如何将其与Apipost集成,以实现高效的API管理和测试。非常适合希望提升React.js项目的开发者!使用Shadcn/UI构建现代化界面 创建简洁的用户界面是前端开发者的主要目标之一。随…

若依开发遇到的问题五

今天在写pdf上传文件接口的时候发生以下的情况:路径很明确,所以直接找过来:是这个类没有下载完成,顶端有提示下载,点击下载,问题解决

[深度学习] 大模型学习2-提示词工程指北

在文章大语言模型基础知识里,提示词工程(Prompt Engineering)作为大语言模型(Large Language Model,LLM)应用构建的一种方式被简要提及,本文将着重对该技术进行介绍。 提示词工程就是在和LLM聊天时,用来让模型回答得更好的一种方法。LLM的工作原理是猜下一个字或词是什…

20250228打卡

大创项目初版完工

Meta 无预警发布新一代 AI 眼镜 Aria Gen 2;腾讯混元 Turbo S 模型将长短思维链融合丨日报

开发者朋友们大家好:这里是 「RTE 开发者日报」 ,每天和大家一起看新闻、聊八卦。我们的社区编辑团队会整理分享 RTE(Real-Time Engagement) 领域内「有话题的 技术 」、「有亮点的 产品 」、「有思考的 文章 」、「有态度的 观点 」、「有看点的 活动 」,但内容仅代表编辑…

关于我在使用Steamlit中碰到的问题及解决方案总结

Steamlit 并不支持一个可以预览本地文件的路径选择器(并不上传文件) 解决方案:使用 Python 自带的 tkinter 来完成 参考:【Streamlit 选择文件夹的曲折方案】Streamlit选择文件夹-CSDN博客 import streamlit as st from tkinter import filedialog, Tk# Set up tkinter roo…