RabbitMQ(六)消息的持久化

目录

    • 一、简介
      • 1.1 定义
      • 1.2 消息丢失的场景
    • 二、交换机的持久化
      • 方式一:直接 new
      • 方式二:channel.exchangeDeclare()
      • 方式三:ExchangeBuilder【推荐】
    • 三、队列的持久化
      • 方式一:直接 new
      • 方式二:channel.queueDeclare()
      • 方式三:QueueBuilder【推荐】
    • 四、消息的持久化
      • 方式一:channel.basicPublish()
      • 方式二:rabbitTemplate.convertAndSend()【推荐】
    • 五、持久化问题

在这里插入图片描述

一、简介

1.1 定义

  • 持久化: 是为了提高 RabbitMQ 消息的可靠性,防止在异常情况(重启宕机)下数据的丢失。
  • RabbitMQ 持久化分为三部分:交换机的持久化队列的持久化消息的持久化

1.2 消息丢失的场景

出现消息丢失的场景有:

  • 生产者发送消息丢失:发送消息到 RabbitMQ Server 异常。 可能因为网络问题造成 RabbitMQ 服务端无法收到消息。——解决方案:ConfirmCallback
  • 生产者发送消息丢失:消息无法路由到指定队列。 可能由于代码层面或配置层面错误导致消息路由到指定队列失败。——解决方案:ReturnCallback
  • RabbitMQ Server 存储消息丢失:消息未完全持久化到磁盘。 可能因为 RabbitMQ 宕机导致消息未完全持久化,或队列丢失从而导致消息丢失等持久化问题。——解决方案:集群部署,实现高可用
  • 消费者消费消息丢失:消费者消费消息异常。 可能在消费者接收消息后,还没来得及消费消息,消费者就发生宕机、故障等问题。——解决方案:消费端手动确认消息

二、交换机的持久化

方式一:直接 new

直接实例化对应的 Exchange 实现类即可,默认:持久化的、非自动删除的

@Bean
public DirectExchange directExchange() {return new DirectExchange(directExchange);
}

Exchange 源码:

package org.springframework.amqp.core;public abstract class AbstractExchange extends AbstractDeclarable implements Exchange {...public AbstractExchange(String name) {// 默认是持久化的、非自动删除的。this(name, true, false);}...
}

方式二:channel.exchangeDeclare()

在 RabbitMQ 的原生 API 中,例如:Java 客户端 API,声明持久化交换机时,需要将 durable 参数设置为 true

// 声明交换机:(交换机名称,交换机类型,持久化)
channel.exchangeDeclare("myExchange", "direct", true);

方式三:ExchangeBuilder【推荐】

在 Spring AMQP 中,我们可以使用 ExchangeBuilder 来创建持久化的交换机:

import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;Exchange exchange = ExchangeBuilder.directExchange("myExchange").durable(true) // 交换机持久化.build();

三、队列的持久化

方式一:直接 new

直接实例化 Queue 类即可,默认:持久化的、非独占的、非自动删除的

@Bean
public Queue myQueue() {return new Queue("myQueue");
}

Queue 源码:

package org.springframework.amqp.core;public Queue(String name) {// 默认是持久化的、非独占的、非自动删除的。this(name, true, false, false);
}

方式二:channel.queueDeclare()

在原生 RabbitMQ API 中,例如 Java 客户端 API,声明持久化队列时,需要将 durable 参数设置为 true

// 声明队列:(队列名称,持久化,非独占,非自动删除,可选队列参数)
channel.queueDeclare("myQueue", true, false, false, null);

方式三:QueueBuilder【推荐】

在 Spring AMQP 中,可以使用 QueueBuilder 来创建持久化的队列:

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;Queue queue = QueueBuilder.durable("myQueue").build();

四、消息的持久化

即使队列时持久化的,发送到队列中的消息默认情况下 可能仍然是非持久化的。要使消息持久化,需要在发送消息时设置消息属性为持久化。

方式一:channel.basicPublish()

在原生 RabbitMQ API 中,例如 Java 客户端 API,声明持久化消息时,需要将 deliveryMode 参数设置为 2

import com.rabbitmq.client.AMQP;AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().deliveryMode(2) // 这里2对应MessageDeliveryMode.PERSISTENT.build();// 参数:(交换机,路由键,消息的其他参数,消息体)
channel.basicPublish("myExchange", "myRoutingKey", props. messageBytes);

或者,我们可以直接使用 MessageProperties.PERSISTENT_TEXT_PLAIN 来进行指定消息的持久化:

import com.rabbitmq.client.MessageProperties;// 参数:(交换机,路由键,消息的其他参数,消息体)
channel.basicPublish("myExchange", "myRoutingKey", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

方式二:rabbitTemplate.convertAndSend()【推荐】

在 Spring AMQP 中,可以使用 rabbitTemplate.convertAndSend() 来创建持久化的消息:

rabbitTemplate.convertAndSend("myQueue", body, message -> {message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;
});

通过上述步骤,消息会被持久化存储,只要队列也被正确配置为持久化,即使 RabbitMQ 服务器重启,消息也将被保留下来。

补充:

然而,即使队列和消息都是持久化的,也不能完全保证消息的 100% 不丢失。例如:在消息尚未被刷写到磁盘时,RabbitMQ 服务器突然崩溃,这种情况下的消息仍然可能会丢失。此外,RabbitMQ 不保证消息的立即持久化,而是尽可能快地将消息保存到磁盘


五、持久化问题

思考:将交换机、队列、消息都设置持久化之后就能保证数据不会丢失吗?

答:当然不能,需要从多方面考虑:

  • 场景1: 如果消费者订阅队列的时候将 autoAck(自动确认)设置为 true,虽然消费者接收到了消息,但是没有来得及处理就宕机了,那消息也会丢失。

    解决方案: 将消费者的消息确认机制改为手动确认,带处理完消息之后,手动删除消息。

  • 场景2: 在 RabbitMQ 服务器,如果消息正确被发送,但是 RabbitMQ 服务器中的消息还没来得及持久化,即没有将数据写入磁盘时,如果服务器发生异常,消息也会丢失。

    解决方案: 可以 通过 RabbitMQ 集群的方式实现消息中间件的高可用

因此,还需要做好生产者和消费者的 消息确认机制 以及通过 集群 的方式来实现 RabbitMQ 的高可用。

整理完毕,完结撒花~ 🌻





参考地址:

1.RabbitMQ保证消息可靠性,https://www.cnblogs.com/auguse/articles/17712620.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/337277.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux Capabilities 基础概念与基本使用

目录 1. Linux capabilities 是什么? 2. capabilities 的赋予和继承 线程的 capabilities Permitted* 允许 Effective* 有效 Inheritable* 遗传 Bounding(集合) Ambient 文件的 capabilities Permitted Inheritable Effective 3…

一文了解Git(所有命令)附带图片

我是南城余!阿里云开发者平台专家博士证书获得者! 欢迎关注我的博客!一同成长! 一名从事运维开发的worker,记录分享学习。 专注于AI,运维开发,windows Linux 系统领域的分享! 其他…

nginx(1.13.7)首次安装出现:【make: *** 没有规则可以创建“default”需要的目标“build” 问题】解决措施

目录 前言: 一.龙蜥(Anolis)操作系统上安装GCC 1.安装gcc 2.检验安装 二.安装出现 make: *** 没有规则可以创建“default”需要的目标“build” 问题 1.解压安装nginx 2.安装出现问题展示 3.解决措施 4.重新编译进行安装 5…

内网渗透之CobaltStrike(上)

目录 一、Cobalt Strike简介 二、Cobalt Strike基本用法 1、启动服务端 2、客户端连接 3、设置监听器(Listeners) 4、脚本管理器(Script Manager) 5、攻击(最常用的是生成后门) 6、CS上线 7、Beaco…

C#编程-实现在文本文件中的读和写

实现在文本文件中的读和写 Stream类用于从文本文件读取数据和向文本文件写入数据。它是一个抽象类,支持向流读写字节。如果文件的数据仅是文本,那么您可以使用StreamReader类和StreamWriter类来完成相应的读和写任务。 StreamReader类 StreamReader类继承自从抽象类TextRea…

C2-4.3.1 多个决策树——随机森林

C2-4.3.1 多个决策树——随机森林 参考链接 1、为什么要使用多个决策树——随机森林? 决策树的缺点: A small change in the data can cause a large change in the structure of the decision tree causing instability 即:对数据集 中…

vue获取当前系统时间

1.获取当前系统时间时分秒 // 标准时间格式转化为年月日时分秒 export function ssDateTimeFn(timestamp) {if (!timestamp) {return timestamp}// timestamp是整数,否则要parseInt转换,不会出现少个0的情况const time new Date(timestamp)const year time.getFu…

代码随想录算法训练营第15天 | 102. 二叉树的层序遍历 + 226. 翻转二叉树 + 101. 对称二叉树

今日内容 102.层序遍历 226.翻转二叉树 101.对称二叉树 102.二叉树的层序遍历 - Medium 题目链接:力扣-102. 二叉树的层序遍历 给你二叉树的根节点 root ,返回其节点值的 层序遍历 。 (即逐层地,从左到右访问所有节点&…

2024年软考网络工程师如何备考?考什么?

先看一下这知识点总结图,在备考复习前大致简单了解一遍! 网工考试时间安排: 网工每年考两次,5月考试一次,11月考试一次 第一步: 通读教程(《网络工程师》),首先对教程中…

Linux学习之网络编程(纯理论)

写在前面 刚刚更新完Linux系统编程,特别推荐大家去看的Linux系统编程,总共44个小时,老师讲的非常好,我是十天肝完的,每天大概看20集,每天还要以写blog的形式来写笔记来总结一下,虽然这十天有点…

交叉编译ARM64架构electron详解

基本介绍 本文主要参考Electron官方文档中 构建说明 和 构建步骤(Linux) 在amd64环境内构建arm64的electron包。 如果是arm64环境请查看文章arm64架构编译electron长征路 一、环境说明 操作系统版本:统信1060 操作系统架构:amd64 内存:32G 如下图: electron版本:v25…

vue+百度地图根据后端返回的经纬度坐标实现地图点位添加

1.效果图 2.准备工作 public/index <script src"http://api.map.baidu.com/api?typewebgl&v2.0&aksRDDfAKpCSG5iF1rvwph4Q95M6tDCApL"></script> 3.html <div id"vehicleMap"></div> 4.js data() {return {url: /…