利用RabbitMQ实现消息投递削峰填谷

目录

异步和同步如何选择

异步线程 同步收发消息

一、导入依赖库

二、创建RabbitMQ配置类

三、创建消息任务类


异步和同步如何选择

·依靠多线程,Java代码可以同步执行也可以异步执行

·RabbitMQ提供了同步和异步两种收发消息模式

·我们采用 Java异步线程 MQ同步收发消息

异步线程 同步收发消息

一、导入依赖库

在 pom.xml 文件中添加RabbitMQ的依赖库 

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.9.0</version>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

二、创建RabbitMQ配置类

        连接 RabbitMQ 需要用到 ConnectionFactory ,所以我们要自己创建好 ConnectionFactory 对象然后注册给Spring框架,这就需要我们创建 RabbitMQConfig 类。 

@Configuration
public class RabbitMQConfig {@Beanpublic ConnectionFactory getFactory(){ConnectionFactory factory=new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);return factory;}
}

三、创建消息任务类

        以前我们使用异步多线程的方式发送邮件,那么这次我们要创建的多线程任务类是用来收发RabbitMQ消息的,而且内部包含了同步执行和异步执行两种方式。

 

@Component
@Slf4j
public class MessageTask {@Autowiredprivate ConnectionFactory factory;@Autowiredprivate MessageService messageService;/*** 同步发送消息** @param topic 主题* @param entity 消息对象*/public void send(String topic, MessageEntity entity) {// 向MongoDB保存消息数据,返回消息IDString id = messageService.insertMessage(entity);// 向RabbitMQ发送消息try (Connection connection = factory.newConnection();Channel channel = connection.createChannel();) {// 连接到某个Topicchannel.queueDeclare(topic, true, false, false, null);// 存放属性数据HashMap map = new HashMap();map.put("messageId", id);// 创建AMQP协议参数对象,添加附加属性AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().headers(map).build();channel.basicPublish("", topic, properties, entity.getMsg().getBytes());log.debug("消息发送成功");} catch (Exception e) {log.error("执行异常", e);throw new EmosException("向MQ发送消息失败");}}@Asyncpublic void sendAsync(String topic, MessageEntity entity) {send(topic, entity);}/*** 同步接收数据** @param topic 主题* @return 接收消息数量*/public int receive(String topic) {int i = 0;// 接收消息数据try (Connection connection = factory.newConnection();Channel channel = connection.createChannel();) {// 从队列中获取消息,不自动确认channel.queueDeclare(topic, true, false, false, null);// Topic中有多少条数据未知,所以使用死循环接收数据,直到接收不到消息,退出死循环while (true) {// 创建响应接收数据,禁止自动发送Ack应答GetResponse response = channel.basicGet(topic, false);if (response != null) {AMQP.BasicProperties properties = response.getProps();// 获取附加属性对象Map<String, Object> map = properties.getHeaders();String messageId = map.get("messageId").toString();// 获取消息正文byte[] body = response.getBody();String message = new String(body);log.debug("从RabbitMQ接收的消息:" + message);MessageRefEntity entity = new MessageRefEntity();entity.setMessageId(messageId);entity.setReceiverId(Integer.parseInt(topic));entity.setReadFlag(false);entity.setLastFlag(true);// 把消息存储在MongoDB中messageService.insertRef(entity);// 数据保存到MongoDB后,才发送Ack应答,让Topic删除这条消息long deliveryTag = response.getEnvelope().getDeliveryTag();channel.basicAck(deliveryTag, false);i++;}else {// 接收不到消息,则退出死循环break;}}} catch (Exception e) {log.error("执行异常", e);throw new EmosException("接收消息失败");}return i;}@Asyncpublic int receiveAsync(String topic) {return receive(topic);}/*** 同步删除消息队列** @param topic 主题*/public void deleteQueue(String topic){try (Connection connection = factory.newConnection();Channel channel = connection.createChannel();) {channel.queueDelete(topic);log.debug("消息队列成功删除");}catch (Exception e) {log.error("删除队列失败", e);throw new EmosException("删除队列失败");}}@Asyncpublic void deleteQueueAsync(String topic){deleteQueue(topic);}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/6033.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

机器学习小结之决策树

文章目录 前言一、介绍1.1 原理1.2 流程1.3 信息熵&#xff0c;信息增益和基尼不纯度 二、构建决策树2.1 特征选择2.2 决策树生成2.3 剪枝 三、经典算法3.1 ID33.2 C4.53.3 CART 四、案例4.1 Iris 数据集 鸢尾花 分类4.2 基于决策树的英雄联盟游戏胜负预测 参考 前言 决策树(D…

盒马上市,即时零售最大“变量”

若盒马年内成功上市&#xff0c;等待完成下一轮融资的朴朴超市的处境恐将更加尴尬&#xff0c;另区域性中小商超或将迎来新一轮倒闭潮。 疫情过后&#xff0c;国内消费市场一直处于走弱态势。据商务大数据监测&#xff0c;今年端午假期&#xff0c;部分地区零售和餐饮数据远不及…

2023年前端面试汇总-React

1. 组件基础 1.1. React事件机制 <div onClick{this.handleClick.bind(this)}>点我</div> React并不是将click事件绑定到了div的真实DOM上&#xff0c;而是在document处监听了所有的事件&#xff0c;当事件发生并且冒泡到document处的时候&#xff0c;React将事…

【Linux】基础IO——文件描述符/缓冲区/重定向/文件系统

文章目录 一、文件描述符二、缓冲区三、重定向的原理四、文件系统 (Linux Ext2)1 认识磁盘的结构CHSLBABlock 2 认识文件系统2.1 分区2.2 文件系统的结构2.3 剖析inode2.4 文件的操作 3 软硬链接3.1 软链接3.2 硬链接 &#x1f4dd; 个人主页 &#xff1a;超人不会飞)&#x1f…

JVM 常量池、即时编译与解析器、逃逸分析

一、常量池 1.1、常量池使用 的数据结构 常量池底层使用HashTable key 是字符串和长度生成的hashValue&#xff0c;然后再hash生成index, 改index就是key&#xff1b;Value是一个HashTableEntry&#xff1b; 1、key hashValue hash string(name&#xff0c; len) i…

LVS负载均衡群集部署——NAT模式

LVS负载均衡群集部署——NAT模式 一、企业群集应用概述1、群集概述2、解决方法3、根据集群针对的目标差异分类 二、负载均衡群集架构三、LVS 的三种工作模式1、NAT 地址转换2、TUN IP隧道 IP Tunnel3、DR 直接路由 Direct Routing 四、LVS的负载调度算法五、ipvsadm工具六、NAT…

linux shell pgrep命令使用方法(pgrep指令)获取进程号、统计进程数量(学会区分Linux进程进程名)

文章目录 问题背景pgrep指令help文档使用示例1. 列出匹配进程的PID和进程名称&#xff08;-l&#xff09;&#xff08;默认只能从进程名的子集字符串匹配&#xff0c;如果要使用完整进程名的子集字符串匹配&#xff0c;请加-f参数&#xff0c;下同&#xff09;2. 列出匹配进程的…

生成古风少女图片【InsCode Stable Diffusion美图活动一期】

作者简介&#xff1a;一名云计算网络运维人员、每天分享网络与运维的技术与干货。 座右铭&#xff1a;低头赶路&#xff0c;敬事如仪 个人主页&#xff1a;网络豆的主页​​​​​ 目录 写在前面 Stable Diffusion 模型在线使用地址&#xff1a; 工具介绍 一.如何使用S…

【NLP】用python实现文本转语音处理

一、说明 介绍一款python调用库&#xff0c;离线软件包pyttsx3 API&#xff0c;它能够将文字转化成语音文件。Python 中有多种 API 可用于将文本转换为语音。pyttsx3 是一个非常易于使用的工具&#xff0c;可将输入的文本转换为音频。与其它类似的库不同&#xff0c;它可以离线…

第十八章 MobileViT网络详解

系列文章目录 第一章 AlexNet网络详解 第二章 VGG网络详解 第三章 GoogLeNet网络详解 第四章 ResNet网络详解 第五章 ResNeXt网络详解 第六章 MobileNetv1网络详解 第七章 MobileNetv2网络详解 第八章 MobileNetv3网络详解 第九章 ShuffleNetv1网络详解 第十章…

Qt/C++编写跨平台的推流工具(支持win/linux/mac/嵌入式linux/安卓等)

一、前言 跨平台的推流工具当属OBS最牛逼&#xff0c;功能也是最强大的&#xff0c;唯一的遗憾就是多路推流需要用到插件&#xff0c;而且CPU占用比较高&#xff0c;默认OBS的规则是将对应画布中的视频画面和设定的音频一起重新编码再推流&#xff0c;意味着肯定占用不少CPU资…

passware kit forensic使用

一、从外部注册表文件提取密码 适用于不联网的情况下&#xff0c;例如2023盘古石杯的NAS取证 找到Windows/System32/config并在本地打开 将路径填充到config folder中 跑出来了John电脑对应的密码是paofen&#xff0c;NAS的密码是P88w0rd 后续遇见再补