MQ基础

news/2025/2/21 20:33:14/文章来源:https://www.cnblogs.com/Helix6/p/18725783

MQ基础认识

MQ结合JAVA客户端

依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置文件

spring:rabbitmq:host: 8.217.155.191port: 5672virtual-host: / #虚拟主机username: admin #用户名password: 123456 #密码

快速入门案例

发送消息

/**
* 发送消息到队列simple.queue
*/
@Test
public void sendMsg2Queue() {
String queueName = "simple.queue";String message = "hello";rabbitTemplate.convertAndSend(queueName,message);
}

消费者接收消息

/**
* 监听simple.queue队列的消费者
* @param msg
*/
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {System.out.println("simple.queue接收到的消息为:"+msg);
}

work模型

Work queues,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息

模拟WorkQueue,实现一个队列绑定多个消费者:
1.在RabbitMO的控制台创建一个队列,名为work.queue
2.在publisher服务中定义测试方法,在1秒内产生50条消息,发送到work.queue
3.在consumer服务中定义两个消息监听者,都监听work.queue队列
4.消费者1每秒处理50条消息,消费者2每秒处理5条消息

发送消息:

/**
* 发送消息到队列work.queue
*/
@Test
public void sendMsg2QueueWork() throws InterruptedException {String queueName = "work.queue";for (int i = 1; i <= 50; i++) {rabbitTemplate.convertAndSend(queueName,"msg-"+i);Thread.sleep(20);}
}

消费者1、1

/**
* 监听work.queue队列的消费者1
* @param msg
*/
@RabbitListener(queues = "work.queue")
public void listenWorkQueue_1(String msg) {System.out.println("work.queue的消费者1接收到的消息为:"+msg);
}/**
* 监听work.queue队列的消费者2
* @param msg
*/
@RabbitListener(queues = "work.queue")
public void listenWorkQueue_2(String msg) throws InterruptedException {System.err.println("work.queue的消费者2接收到的消息为:"+msg);Thread.sleep(200);
}

设置预分配参数,确保消息被消费后才发下一个消息,不分配参数时,每一个消费者都会被发送一个消息。

spring:rabbitmq:listener:simple:prefetch: 1 #预分配数量

fanout交换机

真正生产环境都会经过exchange来发送消息,而不是直接发送到队列,交换机的类型有以下三种:
Fanout:广播、Direct:定向、Topic:话题
Fanout Exchange 会将接收到的消息广播到每一个跟其绑定的queue,所以也叫广播模式

利用SpringAMQP演示FanoutExchange的使用:
1.在RabbitMQ控制台中,声明队列fanout.queue1和fanout.queue2
2.在RabbitMQ控制台中,声明交换机hmall.fanout,将两个队列与其绑定
3.在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
4.在publisher中编写测试方法,向hmall.fanout发送消息

发送消息:

/**
* 发送消息到交换机hmall.fanout
*/
@Test
public void sendMsg2fanout() {String exchangeName = "hmall.fanout";String message = "hello";rabbitTemplate.convertAndSend(exchangeName,null,message);
}

消费者:

/**
* 监听fanout.queue1队列的消费者
* @param msg
*/
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {System.out.println("fanout.queue1的消费者接收到的消息为:"+msg);
}/**
* 监听fanout.queue2队列的消费者
* @param msg
*/
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {System.err.println("fanout.queue2的消费者接收到的消息为:"+msg);
}

交换机的作用是什么:接收publisher发送的消息,将消息按照规则路由到与之绑定的队列,FanoutExchange的会将消息路由到每个绑定的队列

direct交换机

Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由
1.每一个Queue都与Exchange设置一个BindingKey
2.发布者发送消息时,指定消息的RoutingKey
3.Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

利用SpringAMQP演示DirectExchange的使用:
1.在RabbitMQ控制台中,声明队列direct.queue1和direct.queue2
2.在RabbitMQ控制台中,声明交换机hmall.direct,将两个队列与其绑定
3.在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
4.在publisher中编写测试方法,利用不同的RoutingKey向hmal.direct发送消息

发送消息:

/**
* 发送消息到交换机hmall.direct
*/
@Test
public void sendMsg2direct() {String exchangeName = "hmall.direct";rabbitTemplate.convertAndSend(exchangeName,"red","red_msg");rabbitTemplate.convertAndSend(exchangeName,"blue","blue_msg");rabbitTemplate.convertAndSend(exchangeName,"yellow","yellow_msg");
}

消费者

/**
* 监听direct.queue1队列的消费者
* @param msg
*/
@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) {System.out.println("direct.queue1的消费者(red,blue)接收到的消息为:"+msg);
}/**
* 监听direct.queue2队列的消费者
* @param msg
*/
@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) {System.out.println("direct.queue2的消费者(red,yellow)接收到的消息为:"+msg);
}

topic交换机

TopicExchange与DirectExchange类似,区别在于routingKey可以是多个单词的列表,并且以.分割
Queue与Exchange指定BindingKey时可以使用通配符

:代指0个或多个单词,*:代指一个单词

利用SpringAMQP演示DirectExchange的使用:
1.在RabbitMO控制台中,声明队列topic.queue1和topic.queue2
2.在RabbitMQ控制台中,声明交换机hmall.topic,将两个队列与其绑定
3.在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2
4.在publisher中编写测试方法,利用不同的RoutingKey向hmall.topic发送消息

发送消息

/**
* 发送消息到交换机hmall.topic
*/
@Test
public void sendMsg2topic() {String exchangeName = "hmall.topic";rabbitTemplate.convertAndSend(exchangeName,"china.news","china news");rabbitTemplate.convertAndSend(exchangeName,"china.food","china food");rabbitTemplate.convertAndSend(exchangeName,"japan.news","japan news");rabbitTemplate.convertAndSend(exchangeName,"news","news");rabbitTemplate.convertAndSend(exchangeName,"china","china");
}

消费者

/**
* 监听topic.queue1队列的消费者
* @param msg
*/
@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg) {System.out.println("topic.queue1的消费者(china.#)接收到的消息为:"+msg);
}/**
* 监听topic.queue2队列的消费者
* @param msg
*/
@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg) {System.out.println("topic.queue2的消费者(#.news)接收到的消息为:"+msg);
}

声明队列和交换机

方式一:写bean
FanoutConfiguration.java

/*** 写bean配置交换机和队列*/
@Configuration
public class FanoutConfiguration {//交换机@Beanpublic TopicExchange topicExchange() {return new TopicExchange("hmall.topic1");}//队列1@Beanpublic Queue topicQueue3() {return new Queue("topic.queue3");}//队列2@Beanpublic Queue topicQueue4() {return new Queue("topic.queue4");}//binding@Beanpublic Binding topicQueue3Binding(Queue topicQueue3,TopicExchange topicExchange) {return BindingBuilder.bind(topicQueue3).to(topicExchange).with("china.#");}//binding@Beanpublic Binding topicQueue4Binding(Queue topicQueue4,TopicExchange topicExchange) {return BindingBuilder.bind(topicQueue4).to(topicExchange).with("#.news");}
}

方式二:在消费者上写注解

/**
* 监听topic.queue3队列的消费者
* @param msg
*/
@RabbitListener(bindings = @QueueBinding(value = @Queue("topic.queue3"),exchange = @Exchange(value = "hmall.topic1",type = ExchangeTypes.TOPIC),key = {"china.#"}
))
public void listenTopicQueue3(String msg) {System.out.println("topic.queue3的消费者(china.#)接收到的消息为:"+msg);
}/**
* 监听topic.queue4队列的消费者
* @param msg
*/
@RabbitListener(bindings = @QueueBinding(value = @Queue("topic.queue4"),exchange = @Exchange(value = "hmall.topic1",type = ExchangeTypes.TOPIC),key = {"#.news"}
))
public void listenTopicQueue4(String msg) {System.out.println("topic.queue4的消费者(#.news)接收到的消息为:"+msg);
}

消息转换器

测试利用SpringAMQP发送对象类型的消息:
1.声明一个队列,名为object.queue
2.编写单元测试,向队列中直接发送一条消息,消息类型为Map
3.在控制台查看消息,总结你能发现的问题

发现mq收到的消息为乱码,应该改变原来的序列化器为json类型
坐标

<!--Jackson-->
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId>
</dependency>

在消费者和生产者端都配置:

@Bean
public MessageConverter jacksonMessageConvertor(){return new Jackson2JsonMessageConverter();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/887042.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Excel 自动换行后批量设置单元格上下边距

excel 自动换行后 单元格的上下边距挨的很紧,看起来很不舒服 如下图 现在教大家 如何批量设置 单元格中有多行文本后的上下间距,设置成功后 如下图所示:具体步骤: 1、鼠标悬停在 excel 中的任意sheet(页签)上,右键 选择 查看代码 2、在弹出的 Visual Basic 编辑器弹框左…

开发者必备!Github Stars 存储库管理器!

gitstars —— 一个基于 Vite + Vue.js 构建的 GitHub Star 仓库管理器,旨在帮助开发者更好的管理、搜索、查阅收藏的开源项目。大家好,我是 Java陈序员。 Github 作为全球最大的开发者交流平台,拥有数不胜数的开源项目,我们会经常收藏一些开源项目,方便工作和学习。 但随…

1. Linux下 MySQL 的详细安装与使用

1. Linux下 MySQL 的详细安装与使用 @目录1. Linux下 MySQL 的详细安装与使用1. Linux 下安装 MySQL8.0 的详细安装步骤:2. Linxu 当中的MySQL 设置远程登录3. 最后:1. Linux 下安装 MySQL8.0 的详细安装步骤:查看是否安装过MySQL,如果你是用rpm安装, 检查一下RPM PACKAGE:…

临时编辑-----WordPress后台用户手册

登录 WordPress后台默认的登录链接是: https://yourdomain.com/wp-admin/ 输入你的账号(可以是邮箱,也可以是昵称)和密码,即可登录。然后就会进入到你的WordPress网站后台。 注意:忘记你的WordPress密码也不用太慌张,可以从服务器后台进入到WordPress后台。 进来的界面就…

15. Docker容器监控之(CAdvisor+InfluxDB+Granfana)的详细安装和常规使用

15. Docker容器监控之(CAdvisor+InfluxDB+Granfana)的详细安装和常规使用 @目录15. Docker容器监控之(CAdvisor+InfluxDB+Granfana)的详细安装和常规使用1. CAdvisor监控收集+InfluxDB存储数据+Granfana展示图表 的概述1.1 CAdvisor 监控收集1.2 InfluxDB 存储数据1.3 Granfana…

c#中GDI+使用贝塞尔曲线画一朵云

主要是路径的计算 先得到路径if (value.Width > 0 && value.Height > 0) {GraphicsPath.AddBezier(new PointF(RectangleF.Left + RectangleF.Width * 0.1f, RectangleF.Top + RectangleF.Height * 0.55f),new PointF(RectangleF.Left + RectangleF.Width * 0.1f…

如何让你的ida 地址就是RVA

如何让你的ida 地址就是RVA 原理:首先软件进入ida中的时候,其实就相当于把应用程序载入了内存中,RVA其实就是虚拟的便宜地址,也就是在内存中的地址,所以这里的RVA就是在ida中看到的地址-Imagebase Imagebase 其实就是基址。所以我们只需要把imagebase 设置为0即可拿到我…

c#GDI+实现类似油门踏板效果的自定义控件

先看效果图下面是代码protected override void OnPaint(PaintEventArgs e) {e.Graphics.SetGDIHigh();var rect = new Rectangle(0, 0, this.Width, this.Height);// 创建变换矩阵Matrix transformMatrix = new Matrix();// 使用平行四边形的方法近似梯形transformMatrix.Shear…

NocoBase 本周更新汇总:支持全局和批量数据触发自定义操作事件

本周更新包括:支持全局和批量数据触发自定义操作事件,支持数据表预置字段扩展等。汇总一周产品更新日志,最新发布可以前往我们的博客查看。 NocoBase 目前更新包括的版本更新包括三个分支:main ,next和 develop。main :截止目前最稳定的版本,推荐安装此版本。 next:包含…

如何解决远程运维文件传输过程中,面临的安全与效率难题?

堡垒机是一种位于特定网络环境中,用于保障网络和数据安全的设备或系统。它通过集中管理和控制所有远程访问请求,确保每一次操作都经过严格的认证和授权。当用户采用堡垒机开展远程运维和远程访问控制时,需要将安装包、升级包、脚本、工具软件等文件资源,从外部非受控区转移…

20250220

1. 橡胶参考昨晚策略 市场休息 我们也休息。 2. 花生有个多5浪的机会

跨网文件传输的安全性如何保障?5大防护策略在这里

一、跨网文件传输的安全风险有哪些? 1、数据泄露风险 跨网数据交换时,敏感信息可能会在没有足够保护的情况下被泄露,尤其是在内网向外网传输(出网)和外网向内网传输(入网)的过程中。内部人员可能因为疏忽或恶意行为导致数据泄露,跨网数据交换系统需要能够有效防止内部泄…