RabbitMQ知识掌握 【进阶篇】

一、如何保证消息的可靠性 🍉

1.保证消息的可靠性投递 🥝

在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢?特别是在这样比较极端的情况,RabbitMQ 集群不可用的时候,无法投递的消息该如何处理呢?

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。

消息从生产者到消费者的经历哪些组件:
生产者–交换机—队列—消费者

  • confirm 确认模式
  • return 退回模式
  • 消息从 producer 到 exchange 则会返回一个 confirmCallback 。
  • 消息从 exchange–>queue 投递失败则会返回一个 returnCallback 。

默认rabbitmq不开启上面两种模式。

我们将利用这两个 callback 控制消息的可靠性投递

在这里插入图片描述
confirm和return的实现

  1. 设置ConnectionFactory的publisher-confirm-type: correlated开启 确认模式。

  2. 使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。

  3. 设置ConnectionFactory的publisher-returns=“true” 开启 退回模式。

  4. 使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后执行回调函数returnedMessage。

confirm机制

(1)开启confirm

在这里插入图片描述

#开启confirm机制  默认为none会自动删除数据  开启手动模式 correlated
spring.rabbitmq.publisher-confirm-type=correlated

(2)设置rabbitTemplate的confirmCallback回调函数

在这里插入图片描述

  @Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void text01(){rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {if (!b)System.out.println("系统繁忙请重新发送");}});rabbitTemplate.convertAndSend("zt_exchange","b.aaa", "你好,阿娇");}

return机制

在这里插入图片描述

#开启return机制  用来捕捉虚拟主机向队列中传递信息错误
spring.rabbitmq.publisher-returns=true

(2)设置回调

在这里插入图片描述

@Testpublic void text02(){rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {//当交换机发送消息到队列过程中失败启动当前方法System.out.println(replyCode);}});rabbitTemplate.convertAndSend("zt_exchange","b.v", "你好,阿娇");}

2.如何保证消息在队列中不丢失🥝

(1)设置队列为持久化

在这里插入图片描述

(2)设置消息的持久化

生产消息不设置过期时间默认为持久化

在这里插入图片描述

3.确保消息能可靠的被消费掉🥝

ACK确认机制

多个消费者同时收取消息,收取消息到一半,突然某个消费者挂掉,要保证此条消息不丢失,就需要acknowledgement机制,就是消费者消费完要通知服务端,服务端才将数据删除

这样就解决了,即使一个消费者出了问题,没有同步消息给服务端,还有其他的消费端去消费,保证了消息不丢的case。

ACK的实现

ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。
有三种确认方式:
自动确认:acknowledge=“none”
手动确认:acknowledge=“manual”
根据异常情况确认:acknowledge=“auto”,(这种方式使用麻烦,并且不常用,不作讲解)

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息队列中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。

在这里插入图片描述

#修改消费端  手动确认消息  默认自动确认 none
spring.rabbitmq.listener.simple.acknowledge-mode=manual

(2)修改代码

在这里插入图片描述

package com.lzq.listener;import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@Component//交于容器创建并管理
public class MyListener {@RabbitListener(queues={"zt_queue01"})
//queues:表示你监听的队列名public void hello01(Message message , Channel channel) throws IOException { //把监听到的消息封装到Mmessage类对象中long deliveryTag = message.getMessageProperties().getDeliveryTag();byte[] body = message.getBody();String s= new String(body);System.out.println("123");// Map map = JSON.parseObject(s, HashMap.class);try {System.out.println("消息的内容:"+s);//basicAck:确认消息 -- rabbit服务端删除/** long deliveryTag,第一个参数为消息的标志* boolean multiple 第二个参数为是否把该消费之前未确认的消息一起确认掉* */channel.basicAck(deliveryTag,true);}catch (Exception e){//basicNack:服务继续发送消息/** long deliveryTag, boolean multiple, 前两个参数与上面的意义一样* boolean requeue 是否要求rabbitmq服务器重新发送该消息* */channel.basicNack(deliveryTag,true,true);}
//业务操作}
}

总结: 如何保证消息的可靠性?

[1] 保证消息的可靠性投递: confirm机制和return机制

[2] 队列中:—持久化

[3]使用ack机制保证消费者的可靠性消费。

二、延迟队列🍉

TTL🥝

TTL 全称 Time To Live(存活时间/过期时间)。

当消息到达存活时间后,还没有被消费,会被自动清除。

RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
根据设置的时间十秒后会消失

 @Testpublic void test04(){/*** String exchange, String routingKey, Object message,* CorrelationData correlationData*/Message message =new Message("咻咻咻".getBytes());message.getMessageProperties().setExpiration("10000");rabbitTemplate.send("bbb","a.b",message);}

小结:

设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。

设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。

如果两者都进行了设置,以时间短的为准。

死信队列🥝

死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。

在这里插入图片描述

什么样的消息会成为死信消息

  1. 队列消息长度到达限制;

  2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;

  3. 原队列存在消息过期设置,消息到达超时时间未被消费;

队列绑定死信交换机:

在这里插入图片描述

在这里插入图片描述

延迟队列🥝

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

需求:

  1. 下单后,30分钟未支付,取消订单,回滚库存。

  2. 新用户注册成功7天后,发送短信问候。

实现方式:

  1. 定时器:性能差—每隔一段时间要进行数据库查询。

  2. 延迟队列

通过消息队列完成延迟队列的功能

很可惜,在RabbitMQ中并未提供延迟队列功能。

但是可以使用:TTL+死信队列 组合实现延迟队列的效果。

在这里插入图片描述

三、 如何防止消费者重复消费消息🍉

消息的幂等性—无论操作几次结果都是一样。

1、生成全局id,存入redis或者数据库,在消费者消费消息之前,查询一下该消息是否有消费过。

2、如果该消息已经消费过,则告诉mq消息已经消费,将该消息丢弃(手动ack)。

3、如果没有消费过,将该消息进行消费并将消费记录写进redis或者数据库中。

在这里插入图片描述

简单描述一下需求,如果订单完成之后,需要为用户累加积分,又需要保证积分不会重复累加。那么再mq消费消息之前,先去数据库查询该消息是否已经消费,如果已经消费那么直接丢弃消息。

生产者 🥝

package com.ykq.score.producer;import com.alibaba.fastjson.JSONObject;
import com.xiaojie.score.entity.Score;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;import java.util.UUID;@Component
@Slf4j
public class ScoreProducer implements RabbitTemplate.ConfirmCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;//定义交换机private static final String SCORE_EXCHANGE = "ykq_score_exchaneg";//定义路由键private static final String SCORE_ROUTINNGKEY = "score.add";/*** @description: 订单完成* @param:* @return: java.lang.String* @author xiaojie* @date: 2023/7/10 22:30*/public String completeOrder() {String orderId = UUID.randomUUID().toString();System.out.println("订单已完成");//发送积分通知Score score = new Score();score.setScore(100);score.setOrderId(orderId);String jsonMSg = JSONObject.toJSONString(score);sendScoreMsg(jsonMSg, orderId);return orderId;}@Asyncpublic void sendScoreMsg(String jsonMSg, String orderId) {this.rabbitTemplate.setConfirmCallback(this);rabbitTemplate.convertAndSend(SCORE_EXCHANGE, SCORE_ROUTINNGKEY, jsonMSg, message -> {//设置消息的id为唯一message.getMessageProperties().setMessageId(orderId);return message;});}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String s) {if (ack) {log.info(">>>>>>>>消息发送成功:correlationData:{},ack:{},s:{}", correlationData, ack, s);} else {log.info(">>>>>>>消息发送失败{}", ack);}}
}

消费者🥝

package com.xiaojie.score.consumer;import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.xiaojie.score.entity.Score;
import com.xiaojie.score.mapper.ScoreMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.Map;@Component
@Slf4j
public class ScoreConsumer {@Autowiredprivate ScoreMapper scoreMapper;@RabbitListener(queues = {"ykq_score_queue"})public void onMessage(Message message, @Headers Map<String, Object> headers, Channel channel) throws IOException {String orderId = message.getMessageProperties().getMessageId();if (StringUtils.isBlank(orderId)) {return;}log.info(">>>>>>>>消息id是:{}", orderId);String msg = new String(message.getBody());Score score = JSONObject.parseObject(msg, Score.class);if (score == null) {return;}//执行前去数据库查询,是否存在该数据,存在说明已经消费成功,不存在就去添加数据,添加成功丢弃消息Score dbScore = scoreMapper.selectByOrderId(orderId);if (dbScore != null) {//证明已经消费消息,告诉mq已经消费,丢弃消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);return;}Integer result = scoreMapper.save(score);if (result > 0) {//积分已经累加,删除消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);return;} else {log.info("消费失败,采取相应的人工补偿");} }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/22271.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

微软亚洲研究院推出AI编译器界“工业重金属四部曲”

编者按&#xff1a;编译器在传统计算科学中一直是一个重要的研究课题。在人工智能技术快速发展和广泛应用的今天&#xff0c;人工智能模型需要部署在多样化的计算机硬件架构上。同时&#xff0c;训练和部署大型人工智能模型时又对硬件性能有着更高的要求&#xff0c;有时还需根…

7.10蓝桥杯刷题

public class _求阶乘和 {public static void main(String[] args) {// 根据已有的知识 可以知道的是&#xff0c;现在要求s的末尾九位数字&#xff0c;已知的是39之后的阶乘他的后九位都是0;//所以不需要计算到2023的阶乘//一个数求出来的阶乘想要末尾有0//数中必须要有2和5&a…

高压放大器到底有什么作用

高压放大器是一种重要的电子元器件&#xff0c;其作用是将信号放大到更高的电压水平&#xff0c;以便供给需要高电压的负载使用。高压放大器被广泛应用于通讯设备、医疗仪器、仿真模拟、气体激光、光学器件等领域。下面安泰电子将详细介绍高压放大器的作用以及其在各领域中的应…

VUE2基础-Vue实例

Vue 实例 创建一个 Vue 实例 每个 Vue 应用都是通过用 Vue 函数创建一个新的 Vue 实例开始的&#xff1a; var vm new Vue({// 选项 }) 虽然没有完全遵循 MVVM 模型&#xff0c;但是 Vue 的设计也受到了它的启发。因此在文档中经常会使用 vm (ViewModel 的缩写) 这个变量名…

抖音seo矩阵系统源码搭建技术+二开开源代码定制部署

抖音SEO源码是指将抖音平台上的视频资源进行筛选、排序等操作&#xff0c;进而提升其在搜索排名中的权重&#xff0c;从而让更多的用户能够发现并观看到这些视频资源。而抖音SEO矩阵系统源码则是指通过建立一个分析系统&#xff0c;分析抖音中的用户、视频、标签等数据&#xf…

Spring 核心与设计思想

文章目录 1.Spring 是什么&#xff1f;2.什么是容器3.什么是IoC3.1 传统程序开发3.2 IoC程序开发3.3 IoC再理解 4.认识DI 1.Spring 是什么&#xff1f; Spring框架是一个轻量级的企业开发的一站式解决方案&#xff0c;可以基于Spring解决Java EE 开发中的所有问题。 ⽤⼀句大白…

Cortex-M3与Aurix的堆栈

1. TC397是一个基于ARM Cortex-M3内核的微控制器芯片&#xff0c;其堆栈是由系统初始化代码初始化的。在ARM Cortex-M3架构中&#xff0c;堆栈通常由两个寄存器来管理&#xff1a;主堆栈指针&#xff08;MSP&#xff09;和进程堆栈指针&#xff08;PSP&#xff09;。 1.1 MSP是…

12.0、Java_IO流 - 字节数组输入输出流

12.0、Java_IO流 - 字节数组输入输出流 字节数组流&#xff1a; ByteArrayInputStream 和 byteArrayOutputStream 经常用在需要流和数组之间转化的情况&#xff1b; 字节数组输入流&#xff1a; 说白了&#xff0c;FileInputStream 是把文件当做数据源&#xff1b;ByteArrayInp…

【vue】路由的搭建以及嵌套路由

目的&#xff1a;学习搭建vue2项目基础的vue路由和嵌套路由 1.npm 安装 router npm install vue-router3.6.52.src下新建文件夹router文件夹以及文件index.js index.js import Vue from vue import VueRouter from "vue-router" import Home from ../views/Home.…

H3C-Cloud Lab实验-NAT实验

实验拓扑图&#xff1a; IP地址规划&#xff1a; 实验需求&#xff1a; 1. 按照图示配置 IP 地址 2. 私网 A 通过 R1 接入到互联网&#xff0c;私网 B 通过 R3 接入到互联网 3. 私网 A 内部存在 Vlan10 和 Vlan20&#xff0c;通过 R1 上单臂路由访问外部网络 4. 私网 A 通过…

【数据结构常见七大排序(二)】—选择排序篇【直接选择排序】And【堆排序】

目录 前言 1.直接选择排序 1.1基本思想 1.2直接选择排序实现过程 1.3动图助解 1.4直接选择排序源码 2.堆排序 2.1堆排序的概念 2.2堆排序源码 前言 选择排序有两种常见的【直接选择排序】、【堆排序】 1.直接选择排序 1.1基本思想 每一次从待排序的数据元素中选出最…

(数据结构)(C++)数组——约瑟夫环求解

#define _CRT_SECURE_NO_WARNINGS 1 #include <iostream>#define MaxSize 10using namespace std;void josephus(int n,int m)//一共n个人数到m的出列 {int p[MaxSize];int i,j,t;for(i0;i<n;i){p[i]i1;//构建初始序列&#xff08;1,2,3,4.....) } t0;//首次报数起始位…