SpringBoot整合rabbitmq-重复消费问题

说明:重复消费的原因大致是生产者将信息A发送到队列中,消费者监听到消息A后开始处理业务,业务处理完成后,监听在告知rabbitmq消息A已经被消费完成途中中断,也就时说我已经处理完业务,而队列中还存在当前消息A,导致消费者服务恢复后又消费到消息A,出现重复操作的业务。

解决思路:我只要有一个地方记录了消息A已经被消费过了【这个消息必须得设置一个唯一标记】,即使消息A再次被消费时,比对一下,如果有记录则说明消息A已经被消费,如果没有说明没有被消费。

我使用redis及设置redis过期时间来解决重复消费问题。

工程图:

1.pom.xml

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><artifactId>spring-boot-starter-parent</artifactId>  <!-- 被继承的父项目的构件标识符 --><groupId>org.springframework.boot</groupId>  <!-- 被继承的父项目的全球唯一标识符 --><version>2.2.2.RELEASE</version>  <!-- 被继承的父项目的版本 --></parent><groupId>RabbitmqDemoOne</groupId><artifactId>RabbitmqDemoOne</artifactId><version>1.0-SNAPSHOT</version><packaging>war</packaging><name>RabbitmqDemoOne Maven Webapp</name><!-- FIXME change it to the project's website --><url>http://www.example.com</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><dependencies><!--spring boot核心--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!--spring boot 测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--springmvc web--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--开发环境调试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><optional>true</optional></dependency><!--amqp 支持--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--redis--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.78</version></dependency><!-- commons-lang --><dependency><groupId>commons-lang</groupId><artifactId>commons-lang</artifactId><version>2.5</version></dependency><!--lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.10</version></dependency></dependencies><build><finalName>RabbitmqDemoOne</finalName><pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) --><plugins><plugin><artifactId>maven-clean-plugin</artifactId><version>3.1.0</version></plugin><!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_war_packaging --><plugin><artifactId>maven-resources-plugin</artifactId><version>3.0.2</version></plugin><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.8.0</version></plugin><plugin><artifactId>maven-surefire-plugin</artifactId><version>2.22.1</version></plugin><plugin><artifactId>maven-war-plugin</artifactId><version>3.2.2</version></plugin><plugin><artifactId>maven-install-plugin</artifactId><version>2.5.2</version></plugin><plugin><artifactId>maven-deploy-plugin</artifactId><version>2.8.2</version></plugin></plugins></pluginManagement></build>
</project>

2.application.yml

server:port: 8080
spring:redis:host: 127.0.0.1port: 6379rabbitmq:port: 5672host: 192.168.199.139username: adminpassword: adminvirtual-host: /

3.RabbitMqConfig

package com.dev.config;import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author 李庆伟* @title: RabbitMqConfig* @date 2024/3/3 14:12*/
@Configuration
public class RabbitMqConfig {/*** 队列* @return repeatQueue队列名称 true 持久化*/@Beanpublic Queue makeQueue(){return new Queue("repeatQueue",true);}}

4.RedisTemplateConfig

package com.dev.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;/*** @author 李庆伟* @title: RedisTemplateConfig* @date 2024/3/3 14:24*/
@Configuration
public class RedisTemplateConfig {@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();redisTemplate.setConnectionFactory(redisConnectionFactory);// 设置键(key)的序列化采用StringRedisSerializer。redisTemplate.setKeySerializer(new StringRedisSerializer());//redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer());//设置值(value)的序列化采用jdk// 设置值(value)的序列化采用FastJsonRedisSerializer。redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());//redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());//redisTemplate.setHashValueSerializer(fastJsonRedisSerializer);redisTemplate.setHashKeySerializer(new StringRedisSerializer());redisTemplate.afterPropertiesSet();return redisTemplate;}}

5.RabbitRepeatController

package com.dev.controller;import com.alibaba.fastjson.JSONObject;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.HashMap;
import java.util.Map;
import java.util.UUID;/*** @author 李庆伟* @title: RabbitRepeatContoller* @date 2024/3/3 14:05*/
@RestController
@RequestMapping("repeatQueue")
public class RabbitRepeatContoller {@AutowiredRabbitTemplate rabbitTemplate;  //使用RabbitTemplate,这提供了接收/发送等等方法/*** 测试* @return*/@GetMapping("/sendMessage")public String sendMessage() {for (int i = 0; i < 1000; i++) {String id = UUID.randomUUID().toString().replace("-","");Map<String,Object> map = new HashMap<>();map.put("id",id);map.put("name","张龙");map.put("phone","123..11");map.put("num",i);String str = JSONObject.toJSONString(map);Message msg = MessageBuilder.withBody(str.getBytes()).setMessageId(id).build();rabbitTemplate.convertAndSend("", "repeatQueue", msg);}return "ok";}}

6.RabbitMqListener

package com.dev.listener;import com.alibaba.fastjson.JSON;
import com.dev.utils.RedisUtil;
import org.apache.commons.lang.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.io.UnsupportedEncodingException;
import java.util.Map;/*** @author 李庆伟* @title: RabbitMqListener* @date 2024/3/3 14:13*/
@Component
public class RabbitMqListener {@Autowiredprivate RedisUtil redisUtil;@RabbitListener(queues = "repeatQueue")@RabbitHandlerpublic void process(Message msg) throws UnsupportedEncodingException {//获取在发送消息时设置的唯一idString id = msg.getMessageProperties().getMessageId();//去redis中查看是否有记录,如果有证明已经消费过了String val = redisUtil.get(id);if(StringUtils.isNotEmpty(val)){return;}String str = new String(msg.getBody(),"utf-8");if(StringUtils.isNotEmpty(str)){Map<String,Object> map = JSON.parseObject(str,Map.class);System.out.println(map.get("num")+"----"+map.get("id")+"----"+map.get("name")+"----"+map.get("phone"));//将消费过的消息记录到redis中,失效时间为1个小时redisUtil.set(id,id,3600L);System.out.println("----------");}}}

7.RedisUtil

package com.dev.utils;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.BoundZSetOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.stereotype.Component;import java.io.Serializable;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;/*** @author 李庆伟* @title: RedisUtil* @date 2024/3/3 14:27*/@Component
public class RedisUtil {@Autowiredprivate RedisTemplate redisTemplate;/*** 批量删除对应的value** @param keys*/public void remove(final String... keys) {for (String key : keys) {remove(key);}}/*** 批量删除key** @param pattern*/public void removePattern(final String pattern) {Set<Serializable> keys = redisTemplate.keys(pattern);if (keys.size() > 0)redisTemplate.delete(keys);}/*** 删除对应的value** @param key*/public void remove(final String key) {if (exists(key)) {redisTemplate.delete(key);}}/*** 判断缓存中是否有对应的value** @param key* @return*/public boolean exists(final String key) {return redisTemplate.hasKey(key);}/*** 读取缓存** @param key* @return*/public String get(final String key) {Object result = null;redisTemplate.setValueSerializer(new StringRedisSerializer());ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue();result = operations.get(key);if(result==null){return null;}return result.toString();}/*** 写入缓存** @param key* @param value* @return*/public boolean set(final String key, Object value) {boolean result = false;try {ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue();operations.set(key, value);result = true;} catch (Exception e) {e.printStackTrace();}return result;}/*** 写入缓存** @param key* @param value* @return*/public boolean set(final String key, Object value, Long expireTime) {boolean result = false;try {ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue();operations.set(key, value);redisTemplate.expire(key, expireTime, TimeUnit.SECONDS);result = true;} catch (Exception e) {e.printStackTrace();}return result;}public  boolean hmset(String key, Map<String, String> value) {boolean result = false;try {redisTemplate.opsForHash().putAll(key, value);result = true;} catch (Exception e) {e.printStackTrace();}return result;}public  Map<String,String> hmget(String key) {Map<String,String> result =null;try {result=  redisTemplate.opsForHash().entries(key);} catch (Exception e) {e.printStackTrace();}return result;}/*** 递增** @param key 键* @paramby 要增加几(大于0)* @return*/public long incr(String key, long delta) {if (delta < 0) {throw new RuntimeException("递增因子必须大于0");}return redisTemplate.opsForValue().increment(key, delta);}/*** 递减** @param key 键* @paramby 要减少几(小于0)* @return*/public long decr(String key, long delta) {if (delta < 0) {throw new RuntimeException("递减因子必须大于0");}return redisTemplate.opsForValue().increment(key, -delta);}/*** redis zset可已设置排序(案例,热搜)** @param key 键* @paramby* @return*/public void zadd(String key ,String name) {BoundZSetOperations<Object, Object> boundZSetOperations = redisTemplate.boundZSetOps(key);//自增长后的数据boundZSetOperations.incrementScore(name,1);}}

8.App

package com.dev;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;/*** @author 李庆伟* @title: App* @date 2024/3/3 14:01*/
@SpringBootApplication
public class App {public static void main(String[] args) {SpringApplication.run(App.class);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/509218.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ARM64汇编02 - 寄存器与指令基本格式

最近的文章可能会有较多修改&#xff0c;请关注博客哦 异常级别 ARMv8处理器支持4种异常等级&#xff08;Exception Level&#xff0c;EL&#xff09;。 EL0 为非特权模式&#xff0c;用于运行应用程序&#xff0c;其他资源访问受限&#xff0c;权限不够。 EL1 为特权模式&…

短剧分销系统开发,短剧爆火下的商业机遇

这几年来&#xff0c;短剧市场一直保持着快速发展的步伐&#xff0c;在行业中掀起了了一股风潮。短剧被大众当做“电子榨菜”&#xff0c;符合了当下人们的碎片化时间。节奏快、剧情紧凑的特点深受大众的追捧&#xff0c;短剧的市场规模也超过了百亿元。 在短剧的爆火下&#…

【回溯算法】【组合问题】Leetcode 77.组合 216. 组合总和 III

【回溯算法】【回溯算法剪枝】 Leetcode 77.组合 216. 组合总和 III 回溯算法可以解决的问题Leetcode 77.组合解法1 回溯法三部曲&#xff0c;函数参数、终止条件和单层搜索逻辑解法一plus 回溯法剪枝 另一道组合回溯问题 216. 组合总和 III解法&#xff1a;回溯解法&#xff1…

Scratch 第十六课-弹珠台游戏

第十六课-弹珠台游戏 大家好&#xff0c;今天我们一起做一款弹珠台scratch游戏&#xff0c;我们也可以叫它弹球游戏&#xff01;这款游戏在刚出来的时候非常火爆。小朋友们要认真学习下&#xff01; 这节课的学习目标 物体碰撞如何处理转向问题。复习键盘对角色的控制方式。…

PostgreSQL10.21与PostGIS3.2.3安装文档

背景&#xff1a; 公司需要在一个服务器上装一个pg数据库&#xff0c;要求和其余服务器版本尽量保持一致&#xff0c;临时拉我装一下 特别注意&#xff1a; 需要注意的地方就是因为postgresql数据库是一个空间库&#xff0c;gis行业很多都会使用这个数据库&#xff0c;我们安…

【微信小程序】底部菜单(tabBar)

1、首先在app.json中设置pages 首页和我的页面 2、在app.json文件中添加tabar底部菜单信息 详细参数请参考文档 全局配置 | 微信开放文档

5、Linux-vi编辑器

目录 一、介绍 二、三种模式 1、命令模式&#xff08;默认&#xff09; 2、插入模式 3、末行模式 4、模式转换 三、基本操作 1、保存文件&#xff08;末行模式下&#xff09; 2、行号&#xff08;末行模式下&#xff09; 3、查找&#xff08;末行模式下&#xff09; …

详解动态规划(算法村第十九关青铜挑战)

不同路径 62. 不同路径 - 力扣&#xff08;LeetCode&#xff09; 一个机器人位于一个 m x n 网格的左上角 &#xff08;起始点在下图中标记为 “Start” &#xff09;。 机器人每次只能向下或者向右移动一步。机器人试图达到网格的右下角&#xff08;在下图中标记为 “Finis…

软考-中级-系统集成2023年综合知识(四)

&#x1f339;作者主页&#xff1a;青花锁 &#x1f339;简介&#xff1a;Java领域优质创作者&#x1f3c6;、Java微服务架构公号作者&#x1f604; &#x1f339;简历模板、学习资料、面试题库、技术互助 &#x1f339;文末获取联系方式 &#x1f4dd; 软考中级专栏回顾 专栏…

电子签名技术如何保障电子合同的法律效力?

在当今数字化浪潮中&#xff0c;电子合同和电子签名逐渐取代了传统的纸质合同和手写签名&#xff0c;成为商业活动中的新宠。尽管如此&#xff0c;许多人对于电子签名的法律效力仍存有疑问。以下是对电子合同和电子签名相关法律效力的详细解读。 首先&#xff0c;让我们澄清什么…

数据中台:数字中国战略关键技术实施

这里写目录标题 前言为何要建设数据中台数据中台建设痛点数据中台学习资料聚焦前沿&#xff0c;方法论体系更新与时俱进&#xff0c;紧跟时代热点深入6大行业&#xff0c;提炼实践精华大咖推荐&#xff0c;数字化转型必备案头书 前言 在数字中国这一国家战略的牵引下&#xff0…

TypeScript 哲学 - everyday Type

1、 2、TypeScript a structurally typed type system. 3、 type vs interface 3、literal reference 4、non-null assertion operator