java springBoot实现RabbitMq消息队列 生产者,消费者

1.RabbitMq的数据源配置文件

# 数据源配置
spring:rabbitmq:host: 127.0.0.1port: 5672username: rootpassword: root#消息发送和接收确认publisher-confirms: truepublisher-returns: truelistener:direct:acknowledge-mode: manualsimple:acknowledge-mode: manualretry:enabled: true #是否开启消费者重试max-attempts: 5 #最大重试次数initial-interval: 2000 #重试间隔时间(单位毫秒)

2.maven依赖

<!-- rabbitmq -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

3.RabbitMq文件目录预览

4. RabbitMq的Action文件

package com.zq.cnz.mq.constant;public enum Action {ACCEPT, // 处理成功RETRY, // 可以重试的错误REJECT, // 无需重试的错误
}

5.RabbitMq的QueueContent文件

package com.zq.cnz.mq.constant;/*** @ClassName: QueueContent* @Description: 消息队列名称* @author 吴顺杰* @date 2023年11月15日**/
public class QueueContent {/*** 测试消息队列*/public static final String TEST_MQ_QUEUE = "test_mq_queue";/*** 测试消息队列交换机*/public static final String TEST_MQ_QUEUE_EXCHANGE = "test_mq_queue_exchange";/*** 测试消息延迟消费队列*/public static final String TEST_MQ_QUEUE_TIME_DELAY_EXCHANGE = "test_mq_queue_time_delay_exchange";}

6.消息队列生产者MessageProvider方法

package com.zq.cnz.mq;import com.alibaba.fastjson.JSONObject;
import com.zq.common.utils.IdUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;/*** 消息队列生产*/
@Component
public class MessageProvider implements RabbitTemplate.ConfirmCallback {static Logger logger = LoggerFactory.getLogger(MessageProvider.class);/*** RabbitMQ 模版消息实现类*/protected RabbitTemplate rabbitTemplate;public MessageProvider(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;this.rabbitTemplate.setMandatory(true);this.rabbitTemplate.setConfirmCallback(this);}private String msgPojoStr;/*** 推送消息至消息队列** @param msg* @param queueName*/public void sendMqMessage(String queueName,String msg) {try {JSONObject object = JSONObject.parseObject(msg);String msgId = IdUtils.fastUUID().toString();object.put("msgId", msgId);msg = object.toString();msgPojoStr = msg;logger.info("推送消息至" + queueName + "消息队列,消息内容" + msg);rabbitTemplate.convertAndSend(queueName, msg);} catch (AmqpException e) {e.printStackTrace();logger.error("推送消息至消息队列异常 ,msg=" + msg + ",queueName=" + queueName, e);}}/*** 推送广播消息** @param exchangeName* @param msg*/public void sendFanoutMsg(String exchangeName, String msg) {try {JSONObject object = JSONObject.parseObject(msg);String msgId = IdUtils.fastUUID().toString();object.put("msgId", msgId);msg = object.toString();msgPojoStr = msg;logger.info("推送广播消息至交换机" + exchangeName + ",消息内容" + msg);rabbitTemplate.convertAndSend(exchangeName, "", msg);} catch (AmqpException e) {e.printStackTrace();logger.error("推送广播至交换机异常 ,msg=" + msg + ",exchangeName=" + exchangeName, e);}}/*** 发送延时消息** @param queueName* @param msg*/public void sendTimeDelayMsg(String queueName, String exchangeName, String msg, Integer time) {try {JSONObject object = JSONObject.parseObject(msg);String msgId = IdUtils.fastUUID().toString();object.put("msgId", msgId);msg = object.toString();msgPojoStr = msg;logger.info("推送延时消息至" + exchangeName + "," + queueName + "消息队列,消息内容" + msg + ",延时时间" + time + "秒");rabbitTemplate.convertAndSend(exchangeName, queueName, msg, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setHeader("x-delay", time * 1000);return message;}});} catch (AmqpException e) {e.printStackTrace();logger.error("推送消息至消息队列异常 ,msg=" + msg + ",exchangeName=" + exchangeName + ",queueName=" + queueName+ ",time=" + time, e);}}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {logger.info(msgPojoStr + ":消息发送成功");} else {logger.warn(msgPojoStr + ":消息发送失败:" + cause);}}}

7.消息队列消费者RabbitMqConfiguration文件配置

package com.zq.cnz.mq;import com.zq.cnz.mq.constant.QueueContent;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitMqConfiguration {@ResourceRabbitAdmin rabbitAdmin;// 创建初始化RabbitAdmin对象@Beanpublic RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);// 只有设置为 true,spring 才会加载 RabbitAdmin 这个类rabbitAdmin.setAutoStartup(true);return rabbitAdmin;}/*** 测试消息队列** @return*/@Beanpublic Queue TEST_QUEUE() {return new Queue(QueueContent.TEST_MQ_QUEUE);}/*** 测试交换机** @return*/@BeanFanoutExchange TEST_MQ_QUEUE_EXCHANGE() {return new FanoutExchange(QueueContent.TEST_MQ_QUEUE_EXCHANGE);}/*** 测试延迟消费交换机** @return*/@Beanpublic CustomExchange TEST_MQ_QUEUE_TIME_DELAY_EXCHANGE() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange(QueueContent.TEST_MQ_QUEUE_TIME_DELAY_EXCHANGE, "x-delayed-message", true, false, args);}/*** 测试延迟消费交换机绑定延迟消费队列** @return*/@Beanpublic Binding banTestQueue() {return BindingBuilder.bind(TEST_QUEUE()).to(TEST_MQ_QUEUE_TIME_DELAY_EXCHANGE()).with(QueueContent.TEST_MQ_QUEUE).noargs();}// 创建交换机和对列,跟上面的Bean的定义保持一致@Beanpublic void createExchangeQueue() {//测试消费队列rabbitAdmin.declareQueue(TEST_QUEUE());//测试消费交换机rabbitAdmin.declareExchange(TEST_MQ_QUEUE_EXCHANGE());//测试延迟消费交换机rabbitAdmin.declareExchange(TEST_MQ_QUEUE_TIME_DELAY_EXCHANGE());}}

8.TestQueueConsumer 消息队列消费+延迟消费

package com.zq.cnz.mq.MessageConsumer;import com.alibaba.druid.util.StringUtils;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.zq.cnz.mq.MessageProvider;
import com.zq.cnz.mq.constant.Action;
import com.zq.cnz.mq.constant.QueueContent;
import com.zq.common.utils.IdUtils;
import com.zq.common.utils.RedisUtils;
import com.zq.common.utils.spring.SpringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.io.IOException;/*** 测试消息队列消费*/
@Component
@RabbitListener(queues = QueueContent.TEST_MQ_QUEUE)
public class TestQueueConsumer {@Autowiredprivate RedisUtils redisUtils;static final Logger logger = LoggerFactory.getLogger(TestQueueConsumer.class);@RabbitHandlerpublic void handler(String msg, Channel channel, Message message) throws IOException {if (!StringUtils.isEmpty(msg)) {JSONObject jsonMsg = JSONObject.parseObject(msg);
//            logger.info("TestQueueConsumer:"+jsonMsg.toJSONString());Action action = Action.RETRY;
//			获取消息IDString msgId = jsonMsg.getString("msgId");
//			消费次数+1redisUtils.incr("MQ_MSGID:" + msgId, 1);redisUtils.expire("MQ_MSGID:" + msgId, 60);try {logger.info("测试消费队列消费成功啦,消费信息:"+jsonMsg.getString("test"));action = Action.ACCEPT;} catch (Exception e) {logger.error("MQ_MSGID:" + msgId + ",站控权限请求关闭接口异常,msg=" + msg, e);} finally {// 通过finally块来保证Ack/Nack会且只会执行一次if (action == Action.ACCEPT) {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} else if (action == Action.RETRY) {
//					判断当前消息消费次数,已经消费3次则放弃消费if ((Integer) redisUtils.get("MQ_MSGID:" + msgId) >= 3) {logger.error("MQ_MSGID:" + msgId + ",异步处理超出失败次数限制,msg=" + msg);channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);} else {
//						回归队列重新消费channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}} else {channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);}}}}
}

9.TestExchangeConsumer 交换机广播模式 

package com.zq.cnz.mq.MessageConsumer;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.rabbitmq.client.Channel;
import com.zq.cnz.mq.constant.Action;
import com.zq.cnz.mq.constant.QueueContent;
import com.zq.common.utils.IdUtils;
import com.zq.common.utils.RedisUtils;
import com.zq.common.utils.StringUtils;
import com.zq.common.utils.spring.SpringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.io.IOException;
import java.util.List;/*** 测试交换机消费*/
@Component
@RabbitListener(bindings = @QueueBinding(value = @Queue(), exchange = @Exchange(value = QueueContent.TEST_MQ_QUEUE_EXCHANGE, type = ExchangeTypes.FANOUT)))
public class TestExchangeConsumer {static final Logger logger = LoggerFactory.getLogger(TestExchangeConsumer.class);@Resourceprivate RedisUtils redisUtils;@RabbitHandlerpublic void handler(String msg, Channel channel, Message message) throws IOException {if (!StringUtils.isEmpty(msg)) {
//            logger.info("接收交换机生产者消息:{}", msg);Action action = Action.ACCEPT;// 请求参数JSONObject jsonMsg = JSONObject.parseObject(msg);
//			获取消息IDString msgId = jsonMsg.getString("msgId");//			消费次数+1redisUtils.incr("MQ_MSGID:" + msgId, 1);redisUtils.expire("MQ_MSGID:" + msgId, 60);try {Integer CMD = jsonMsg.getInteger("cmd");if (CMD==1) {logger.info("cmd1测试消费队列消费成功啦,消费信息:"+jsonMsg.getString("test"));}else if(CMD==2){logger.info("cmd2测试消费队列消费成功啦,消费信息:"+jsonMsg.getString("test"));}action = Action.ACCEPT;} catch (Exception e) {action = Action.REJECT;e.printStackTrace();} finally {// 通过finally块来保证Ack/Nack会且只会执行一次if (action == Action.ACCEPT) {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} else if (action == Action.RETRY) {
//					判断当前消息消费次数,已经消费3次则放弃消费if ((Integer) redisUtils.get("MQ_MSGID:" + msgId) >= 3) {logger.error("MQ_MSGID::" + msgId + ",换电失败消息队列消费了三次,msg=" + msg);channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);} else {
//						回归队列重新消费channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}} else {channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);}}}}
}

运行项目 调用RabbitmqTestController生产RabbitMq消息体, TestExchangeConsumer和TestQueueConsumer自动消费

package com.zq.web.controller.tool;
import com.alibaba.fastjson.JSONObject;
import com.zq.cnz.mq.MessageProvider;
import com.zq.cnz.mq.constant.QueueContent;
import com.zq.common.utils.IdUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;/*** 消息队列测试*/
@RestController
@RequestMapping("/test/mq")
public class RabbitmqTestController {@Resourceprivate MessageProvider messageProvider;/*** 查询储能站信息列表*/@GetMapping("/putMq")public void putMq(){JSONObject obj=new JSONObject();obj.put("test","测试数据");//推送消息至消息队列messageProvider.sendMqMessage(QueueContent.TEST_MQ_QUEUE,obj.toString());obj.put("cmd",1);obj.put("test","这是广播消费");//推送广播消息messageProvider.sendFanoutMsg(QueueContent.TEST_MQ_QUEUE_EXCHANGE,obj.toString());//发送延时消息obj.put("cmd",2);obj.put("test","这是延迟消费");messageProvider.sendTimeDelayMsg(QueueContent.TEST_MQ_QUEUE,QueueContent.TEST_MQ_QUEUE_TIME_DELAY_EXCHANGE,obj.toString(),2*60);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/177828.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【JavaEE】Servlet API 详解(HttpServletResponse类方法演示、实现自动刷新、实现自动重定向)

一、HttpServletResponse HttpServletResponse表示一个HTTP响应 Servlet 中的 doXXX 方法的目的就是根据请求计算得到相应, 然后把响应的数据设置到 HttpServletResponse 对象中 然后 Tomcat 就会把这个 HttpServletResponse 对象按照 HTTP 协议的格式, 转成一个字符串, 并通…

JavaScript 语句、标识符、变量

语句 JavaScript程序的单位是行(line),也就是一行一行地执行。一般情况下&#xff0c;每一行就是一个语句 var num 10; 语句以分号结尾&#xff0c;一个分号就表示一个语句结束。 标识符 标识符(identifier)指的是用来识别各种值的合法名称。最常见的标识符就是变量名标识符…

微信昵称后面的“小耳朵”是干什么用的?

微信&#xff0c;一款我们日常使用频繁的社交软件&#xff0c;它的功能远不止于聊天、刷朋友圈、支付和刷视频。其实&#xff0c;微信的许多不常用功能可以解决我们的实际问题。 聊天时&#xff0c;我发现朋友微信昵称后面多了一个神秘的小耳朵图标&#xff0c;引发了我的好奇心…

按键精灵中的字符串常用的场景

在使用按键精灵编写脚本时&#xff0c;与字符串有关的场景有以下几种&#xff1a; 1. 用时间字符串记录脚本使用截止使用时间 Dim localTime "2023-11-12 00:15:14" Dim networkTime GetNetworkTime() TracePrint networkTime If networkTime > localTime The…

关系选择器

关系选择器&#xff0c;说明元素和元素之间需要存在关系了。 后代选择器 定义&#xff1a;选择所有被E元素包含的F元素&#xff0c;中间用空格隔开 语法&#xff1a;E F{ } 选择E元素下面所有的F元素 <ul><li>宝马</li><li>奔驰</li> </u…

Git的基本操作以及原理介绍

文章目录 基本操作创建git仓库配置name和email .git目录的结构git add & git commit.git目录结构的变化 git追踪管理的数据git的版本回退回退的原理回退的三种情况 版本库中文件的删除git分支管理分支的删除合并分支时的冲突分支的合并模式分支策略git stash不要在master分…

ARM 基础学习记录 / ARM 裸机编程

汇编程序调用 C 程序详情 在 C 程序和 ARM 汇编程序之间相互调用时必须遵守 ATPCS 规则&#xff0c;其是基于 ARM 指令集和 THUMB 指令集过程调用的规范&#xff0c;规定了调用函数如何传递参数&#xff0c;被调用函数如何获取参数&#xff0c;以何种方式传递函数返回值。 寄存…

Shadows实时阴影原理

文章目录 一、Shadows Mapping1.第一个Pass&#xff1a;从光源Light射出方向出发&#xff0c;记录到达像素&#xff08;片元&#xff09;最浅的距离2.第二个Pass:从眼睛&#xff08;摄像头&#xff09;看向方向出发&#xff0c;渲染场景得到像素&#xff08;片元&#xff09;&a…

【广州华锐互动】地震防灾减灾科普3D虚拟展厅:向公众普及地震安全知识

在面对自然灾害时&#xff0c;我们都需要有足够的知识和准备来保护自己和他人。这就是为什么地震安全知识的普及如此重要。然而&#xff0c;传统的教育方法可能无法满足所有人的需求&#xff0c;特别是在这个数字化的时代。为了解决这个问题&#xff0c;广州华锐互动制作开发了…

人工智能基础_机器学习035_多项式回归升维实战2_使用sklearn的PolynomialFeatures进行升维---人工智能工作笔记0075

我们再来做一个升维处理,这里我们不再自己去对数据进行比如,相乘操作,来给数据手动添加维度了, 这里我们用sklearn库提供的PolynomialFeatures来自动对数据进行升维. from sklearn.linear_model import LinearRegression # PolynowlalFeatures,多项式升维处理 from sklearn.…

音画欣赏|《纯洁的梦乡》

《纯洁的梦乡》 80x60cm 陈可之2021年绘 题龙阳县青草湖 【元】唐温如 西风吹老洞庭波&#xff0c;一夜湘君白发多。 醉后不知天在水&#xff0c;满船清梦压星河。 车遥遥篇 【宋】范成大 车遥遥&#xff0c;马憧憧。 君游东山东复东&#xff0c;安得奋飞逐西风。 愿我如星…

notpad++正则化,利用关键字符删除整行

首先&#xff0c;ctrlf,选中[替换]&#xff0c;勾选正则表达式&#xff08;可以勾选[匹配大小写]&#xff0c;不用勾选[匹配新行]&#xff09;。在[查找目标]框输入[^(.*)"car_no_clean"(.*)$\n]。在$后加上\n&#xff0c;可以将被替换的行直接删除&#xff0c;不加则…