Rabbitmq的消息确认

配置文件

spring:rabbitmq:publisher-confirm-type: correlated #开启确认回调publisher-returns: true #开启返回回调listener:simple:acknowledge-mode: manual #设置手动接受消息

消息从生产者到交换机
无论消息是否到交换机ConfirmCallback都会触发。

    @Resourceprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init() {//构造方法执行之后执行,用于初始化一些信息rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info("消息成功到达交换机");return;}//未到达交换机可以采取一系列措施保证消息不会丢失log.error("消息未发送到交换机{}", cause);}});}

消息从交换机到队列
只有消息没到达队列才会触发ReturnsCallback

    @Resourceprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init() {rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("消息没有从交换机到达队列{}", returned.getReplyText());}});}

消息从队列到消费者(ACK)
消息默认是自动确认的(手动确认需配置文件开启),无论消息是否被成功消费都会被确认,确认后消息就会自动删除

Channel接口里有三个方法

// deliveryTag消息的唯一表示 multiple 为true可以批量处理这条消息之前的所有消息,假设你的消费者从 RabbitMQ 中获取了一批消息,然后在处理完这批消息后,你可以一次性确认所有消息,而不需要一个一个地确认。requeue 是否重新入队,不重新入队就会变成死信,如果配置了死信交换机和死信队列就会进入死信队列,没有配置消息就直接删除
void basicAck(long deliveryTag, boolean multiple) //确认消息
void basicNack(long deliveryTag, boolean multiple, boolean requeue)//不确认消息
void basicReject(long deliveryTag, boolean requeue)// 拒绝消息

示例代码

    @RabbitListener(queues = {"queue.direct.i"})public void receiveMessage2(Message message, Channel channel) {MessageProperties messageProperties = message.getMessageProperties();//消息的唯一标识,发消息时自动添加,消息的身份证long deliveryTag = messageProperties.getDeliveryTag();try {byte[] body = message.getBody();log.info("接收到的消息为{}", new String(body));//multiple false 表示只确认当前消息 true 确认所有消息channel.basicAck(deliveryTag, false);} catch (Exception e) {log.error("处理过程出错{}", e.getMessage());try {//requeue true 重新入队 false 进入死信队列,如果没有死信队列则直接删除channel.basicNack(deliveryTag, false, false);} catch (IOException ex) {throw new RuntimeException(ex);}throw new RuntimeException(e);}}

消息可靠性投递方案一.jpg

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/54599.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux(环境变量)

Linux(环境变量) 常见环境变量查看环境变量方法和环境变量相关的指令环境变量的组织方式通过代码如何获取环境变量 环境变量(environment variables)一般是指在操作系统中用来指定操作系统运行环境的一些参数如:我们在编写C/C代码的时候&…

【Nginx基础】Nginx基础及安装

目录 Nginx出现背景Nginx 概念Nginx 作用Http 代理,反向代理负载均衡:内置策略和扩展策略内置策略:轮询内置策略:加权轮询内置策略:IP hash 动静分离 安装 NginxWindows下安装(nginx-1.16.1)Lin…

安装zabbix5.0监控

官网安装手册: https://www.zabbix.com/cn/download 一、 安装zabbix a. 安装yum源 rpm -Uvh https://repo.zabbix.com/zabbix/5.0/rhel/7/x86_64/zabbix-release-5.0-1.el7.noarch.rpmyum clean allb. 安装Zabbix server,web前端,agent y…

Kubernetes关于cpu资源分配的设计

kubernetes资源 在K8s中定义Pod中运行容器有两个维度的限制: 资源需求(Requests):即运行Pod的节点必须满足运行Pod的最基本需求才能运行Pod。如 Pod运行至少需要2G内存,1核CPU。(软限制)资源限额(Limits):即运行Pod期间,可能内存使用量会增加,那最多能使用多少内存,这…

MongoDB 使用总结

🍓 简介:java系列技术分享(👉持续更新中…🔥) 🍓 初衷:一起学习、一起进步、坚持不懈 🍓 如果文章内容有误与您的想法不一致,欢迎大家在评论区指正🙏 🍓 希望这篇文章对你有所帮助,欢…

【技能实训】DMS数据挖掘项目(完整程序)

文章目录 1. 系统需求分析1.1 需求概述1.2 需求说明 2. 系统总体设计2.1 编写目的2.2 总体设计2.2.1 功能划分2.2.2 数据库及表2.2.3 主要业务流程 3. 详细设计与实现3.1 表设计3.2 数据库访问工具类设计3.3 配置文件3.4 实体类及设计3.5 业务类及设计3.6 异常处理3.7 界面设计…

docker 安装 字体文件

先说一下我当前的 场景 及 环境,这样同学们可以先评估本篇文章是否有帮助。 环境: dockerphp8.1-fpmwindows 之所以有 php,是因为这个功能是使用 php 开发的,其他语言的同学,如果也有使用到 字体文件,那么…

zookeeper集群和kafka的相关概念就部署

目录 一、Zookeeper概述 1、Zookeeper 定义 2、Zookeeper 工作机制 3、Zookeeper 特点 4、Zookeeper 数据结构 5、Zookeeper 应用场景 (1)统一命名服务 (2)统一配置管理 (3)统一集群管理 (4&a…

vue2-vue项目中你是如何解决跨域的?

1、跨域是什么? 跨域本质是浏览器基于同源策略的一种安全手段。 同源策略(sameoriginpolicy),是一种约定,它是浏览器最核心也是最基本的安全功能。 所谓同源(即指在同一个域)具有以下三个相同点…

mysql大表的深度分页慢sql案例(跳页分页)

1 背景 有一张表,内容是 redis缓存中的key信息,数据量约1000万级, expiry列上有一个普通B树索引。 -- test.top definitionCREATE TABLE top (database int(11) DEFAULT NULL,type varchar(50) DEFAULT NULL,key varchar(500) DEFAULT NUL…

【驱动开发day8作业】

作业1&#xff1a; 应用层代码 #include <stdlib.h> #include <stdio.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <unistd.h> #include <string.h> #include <sys/ioctl.h>int main(int…

express学习笔记6 - 用户模块

新建router/user.js const express require(express) const routerexpress.Router() router.get(/login, function(req, res, next) {console.log(/user/login, req.body)res.json({code: 0,msg: 登录成功})})module.exportsrouter 在router/user.js引入并使用 const us…