Rabbitmq消息丢失-生产者消息丢失(一)

说明:消息生产者在将数据发送到Mq的时候,可能由于网络等原因造成数据投递失败。

消息丢失大致分三种:这里说的是生产者消息丢失

分析原因:

1.有没有一种可能,我刚发送消息,消息还没有到交换机就断网了,是不是消息就没有发送成功,这个时候如果不对这种情况处理,消息是不是就丢失了

2.又有没有一种可能,我又发送了一条消息,交换机拿到消息后正要发送给某个队列,就是你,你把那个队列给删掉了,这个时候消息找不到队列,消息就也丢失了

解决方法:

1.事务:Rabbitmq提供了事务功能,就是生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit。

缺点:RabbitMQ 事务机制是同步的,提交一个事务之后会阻塞在那儿,采用这种方式基本上吞吐量会下来,太耗性能了

2.confirm机制:相比于事务的同步,confirm机制是异步的,你发送完这个消息之后就可以发送下一个消息,RabbitMQ 接收了之后会异步回调confirm接口通知你这个消息接收到了。一般在生产者这块解决数据丢失,建议使用 confirm 机制。

话不多说,干代码

工程图:

1.pom.xml

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><artifactId>spring-boot-starter-parent</artifactId>  <!-- 被继承的父项目的构件标识符 --><groupId>org.springframework.boot</groupId>  <!-- 被继承的父项目的全球唯一标识符 --><version>2.2.2.RELEASE</version>  <!-- 被继承的父项目的版本 --></parent><groupId>MqLossDemo</groupId><artifactId>MqLossDemo</artifactId><version>1.0-SNAPSHOT</version><packaging>war</packaging><name>MqLossDemo Maven Webapp</name><!-- FIXME change it to the project's website --><url>http://www.example.com</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><dependencies><!--spring boot核心--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!--spring boot 测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--springmvc web--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--开发环境调试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><optional>true</optional></dependency><!--amqp 支持--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--redis--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>2.1.7.RELEASE</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.78</version></dependency><dependency><groupId>commons-lang</groupId><artifactId>commons-lang</artifactId><version>2.6</version></dependency><!--lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.10</version></dependency></dependencies><build><finalName>MqLossDemo</finalName><pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) --><plugins><plugin><artifactId>maven-clean-plugin</artifactId><version>3.1.0</version></plugin><!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_war_packaging --><plugin><artifactId>maven-resources-plugin</artifactId><version>3.0.2</version></plugin><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.8.0</version></plugin><plugin><artifactId>maven-surefire-plugin</artifactId><version>2.22.1</version></plugin><plugin><artifactId>maven-war-plugin</artifactId><version>3.2.2</version></plugin><plugin><artifactId>maven-install-plugin</artifactId><version>2.5.2</version></plugin><plugin><artifactId>maven-deploy-plugin</artifactId><version>2.8.2</version></plugin></plugins></pluginManagement></build>
</project>

2.application.yml

server:port: 8080
spring:rabbitmq:port: 5672host: 你的 rabbitmq IPusername: adminpassword: adminvirtual-host: /# 发送者开启 confirm 确认机制publisher-confirm-type: correlated# 发送者开启 return 确认机制publisher-returns: truetemplate:#在配置文件中配置 mandatory: true 页无用,需要在RabbitTemplate中手动设置mandatory: true

3.RabbitMqConfig

package com.dev.config;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 类名称:** @author 李庆伟* @date 2024年03月04日 14:12*/
@Configuration
public class RabbitMqConfig {@Beanpublic ConfirmCallbackService confirmCallbackService() {return new ConfirmCallbackService();}@Beanpublic ReturnCallbackService returnCallbackService() {return new ReturnCallbackService();}@Beanpublic RabbitTemplate rabbitTemplate(@Autowired CachingConnectionFactory factory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);//生产者发送消息到Mq交换机回执,手动ack回执回调处理//可以理解为:消息推送到server,但是在server里找不到交换机//如果想看效果【先清除交换机和队列】:在工程运行前注释掉RabbitMqQueueConfig类中的directExchange和bindingDirect方法rabbitTemplate.setConfirmCallback(confirmCallbackService());//生产者发送消息到Mq,交换机发送到队列回执,一定要设置手动设置Mandatory(true),配置文件中不生效//可以理解为:消息推送到server,但是在server里找不到队列//如果想看效果【先清除交换机和队列】:如果之前看过setConfirmCallback效果,先去掉RabbitMqQueueConfig类中注释//           在工程运行前注释掉RabbitMqQueueConfig类中的directQueue和bindingDirect方法rabbitTemplate.setReturnCallback(returnCallbackService());rabbitTemplate.setMandatory(true);return rabbitTemplate;}//生产者发送消息到Mq交换机回执 //可以理解为:消息推送到server,但是在server里找不到交换机class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {//log.info("发送者已经收到确认,correlationData={} ,ack={}, cause={}", correlationData, ack, cause);System.out.println(correlationData);System.out.println(ack);System.out.println(cause);System.out.println("--------");} else {System.out.println("消息发送异常!");//可以进行重发等操作//这里可以处理失败的业务}}}//生产者发送的消息到Mq队列回执 //可以理解为:消息推送到server,但是在server里找不到队列class ReturnCallbackService implements RabbitTemplate.ReturnCallback {//public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {@Overridepublic void returnedMessage(Message message, int i, String s, String s1, String s2) {System.out.println(message.getMessageProperties().getMessageId());System.out.println(new String(message.getBody()));System.out.println(i);System.out.println(s);System.out.println(s1);System.out.println(s2);//可以将消息存储到一个新的位置,这里可以处理失败的业务}}}

4.RabbitMqQueueConfig

package com.dev.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 类名称:** @author 李庆伟* @date 2024年03月04日 14:12*/
@Configuration
public class RabbitMqQueueConfig {//绑定键public final static String QUEUE_ONE = "loss_queue";public final static String EXCHANGE_ONE = "loss_exchange";@Beanpublic Queue directQueue() {return new Queue(RabbitMqQueueConfig.QUEUE_ONE);}//Direct交换机 起名:directExchange@BeanDirectExchange directExchange() {return new DirectExchange(RabbitMqQueueConfig.EXCHANGE_ONE,true,false);}//绑定  将队列和交换机绑定, 并设置用于匹配键:directRoutingKey@BeanBinding bindingDirect() {return BindingBuilder.bind(directQueue()).to(directExchange()).with("directRoutingKey");}}

5.RabbitContoller

package com.dev.controller;import com.alibaba.fastjson.JSONObject;
import com.dev.config.RabbitMqQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.HashMap;
import java.util.Map;
import java.util.UUID;/*** 类名称:消息丢失问题** @author lqw* @date 2024年02月27日 14:47*/
@Slf4j
@RestController
@RequestMapping("loss")
public class RabbitController {@AutowiredRabbitTemplate rabbitTemplate;  //使用RabbitTemplate,这提供了接收/发送等等方法/*** 消息丢失* @return*/@GetMapping("/sendMessage")public String sendMessage() {String id = UUID.randomUUID().toString().replace("-","");Map<String,Object> map = new HashMap<>();map.put("id",id);map.put("name","张龙");Message msg = MessageBuilder.withBody(JSONObject.toJSONString(map).getBytes()).setMessageId(id).build();rabbitTemplate.convertAndSend(RabbitMqQueueConfig.EXCHANGE_ONE, "directRoutingKey", msg);return "ok";}}

6.App

package com.dev;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;/*** 类名称:** @author 李庆伟* @date 2024年03月04日 14:11*/
@SpringBootApplication
public class App {public static void main(String[] args) {SpringApplication.run(App.class);}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/511265.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

06-prometheus的数据存储

一、本地存储prometheus收集的监控数据 就是将默认的存储&#xff0c;修改为“我们指定”的目录下&#xff1b; 1&#xff0c;配置systemctl启动文件 [rootprometheus-server32 ~]# vim /etc/systemd/system/prometheus-server.service [Unit] DescriptionPrometheus Server D…

第五套CCF信息学奥赛c++练习题 CSP-J认证初级组 中小学信奥赛入门组初赛考前模拟冲刺题(阅读程序题)

第五套中小学信息学奥赛CSP-J考前冲刺题 二、阅读程序题 (程序输入不超过数组或字符串定义的范围&#xff0c;判断题正确填√错误填X;除特殊说明外&#xff0c;判断题 1.5分&#xff0c;选择题3分&#xff0c;共计40分) 第一题 递归函数 1 #include<iostream> 2 usin…

使用 helm repo add istio添加了一个helm chart repo,如何查看istio的版本呢

1. 添加chart repo helm repo add istio https://istio-release.storage.googleapis.com/charts helm repo update2. 查看版本 helm search repo istio 3. 查看版本详细信息 helm show chart istio/cni 4. 查看某个chart的历史版本 helm search repo <chart-name> --…

适用于 Windows 的7大数据恢复软件解决方案

数据丢失是数字世界中令人不快的一部分&#xff0c;它会在某一时刻影响许多计算机用户。很容易意外删除一些重要文件&#xff0c;这可能会在您努力恢复它们时带来不必要的压力。幸运的是&#xff0c;数据恢复软件可以帮助恢复已删除的文件&#xff0c;即使您没有备份它们。以下…

绝地求生:小团团曝被判8年:地图语音包或将下架,公会和主播被一锅端

这两天大家都在讨论某鱼平台办卡抽奖的事情。这件事发生之后没多久&#xff0c;某鱼平台里面的大量主播在同一时间停播&#xff0c;闲游盒认为大家应该也懂的&#xff0c;这次停播就是因为这些主播也参与了办卡抽奖&#xff0c;都需要接受调查&#xff0c;在两个月左右的调查中…

关于vue创建项目以及关于eslint报错的问题

vue创建完项目以后如果报parsing error no babel config file。。。这样的错误的话&#xff0c;关闭项目&#xff0c;用vscode进入项目中打开项目就可以解决了。 1 代码保存的时候会自动将单引号报错为双引号 导致eslint报错的问题&#xff0c; 解决思路&#xff1a; 在项目根…

STM32自学☞I2C

这里只是大体介绍&#xff0c;具体的可参考STM32数据手册

mysql学习笔记7——数据库查询深入

.sql文件 在实际使用数据库时&#xff0c;常常要对数据库文件进行备份&#xff0c;以便在数据库遭到入侵或者非人为因素导致损坏后&#xff0c;快速恢复数据 .sql文件便提供了这种功能&#xff0c;首先.sql文件是由一串串mysql指令组成的&#xff0c;我们插入.sql文件实际相当…

加密与安全_ 凯撒密码

文章目录 Pre概述Code 实现 凯撒密码字母频率分析攻击Code解密凯撒密码 小结 Pre PKI - 02 对称与非对称密钥算法 概述 凯撒密码是一种简单的替换加密技术&#xff0c;也称为移位密码。它是古典密码学中最早的密码之一&#xff0c;得名于古罗马军队领袖凯撒尤利乌斯&#xff…

『Linux从入门到精通』第 ㉕ 期 - System V 共享内存

文章目录 &#x1f490;专栏导读&#x1f490;文章导读&#x1f427;共享内存原理&#x1f427;共享内存相关函数&#x1f426;key 与 shmid 区别 &#x1f427;代码实例 &#x1f490;专栏导读 &#x1f338;作者简介&#xff1a;花想云 &#xff0c;在读本科生一枚&#xff0…

用Arduino中Wire库写I2C驱动-提高篇(IP2368芯片驱动为例)

之前写了一篇文章“用Arduino中Wire库写I2C驱动-入门篇”&#xff0c;链接地址&#xff1a;用Arduino中Wire库写I2C驱动-入门篇_arduino wire库-CSDN博客对I2C驱动编写做了一个简单的介绍&#xff0c;这一篇里&#xff0c;我们将使用IP2368这个芯片为例&#xff0c;详细的讲解一…

Prometheus结合Grafana监控MySQL,这篇不可不读!

&#x1f4e2;&#x1f4e2;&#x1f4e2;&#x1f4e3;&#x1f4e3;&#x1f4e3; 哈喽&#xff01;大家好&#xff0c;我是【IT邦德】&#xff0c;江湖人称jeames007&#xff0c;10余年DBA及大数据工作经验 一位上进心十足的【大数据领域博主】&#xff01;&#x1f61c;&am…