RabbitMQ

一、初识 MQ

1. 同步通讯

时效性强,立即获取结果

微服务间基于 Feign 的调用就属于同步

方式,存在一些问题:

① 耦合度高

② 性能和吞吐能力不如异步

③ 额外资源消耗

④ 级联失败问题

2. 异步通讯

异步调用常见实现就是事件驱动模式

优点:

① 服务解耦

② 性能提升,吞吐量提高

③ 服务没有强依赖,不担心级联问题

④ 流量削峰

缺点

① 依赖 Broker(事件代理者) 的可靠性、

    安全性、吞吐能力

② 架构复杂的情况下,业务没有明显

    的流程线,不好追踪管理

3. MQ 常见框架

MQ (MessageQueue),中文是消息队列,

字面来看就是存放消息的队列,也就是事

件驱动架构中的 Broker

二、RabbitMQ 快速入门

1. RabbitMQ 概述

RabbitMQ 是基于 Erlang 语言开发的开源

消息通信中间件

官网地址:https://www.rabbitmq.com/

channel(通道):操作 MQ 的工具

exchange(交换机):路由消息到队列中

queue(队列):缓存消息

virtual host:虚拟主机,是对 queue、

             exchange 等资源的逻辑分组

2. 常见消息模型

(1) 基本消息队列(BasicQueue)

 1) 官方的 HelloWorld 是基于最基础的

    消息队列模型来实现的,只包括三个

    角色:

publisher:消息发布者,将消息发送到队列

queue queue:消息队列,负责接受并缓存消息

consumer:订阅队列,处理队列中的消息

  2) 基本消息队列的消息发送流程:

① 建立 connection

② 创建 channel

③ 利用 channel 声明队列

④ 利用 channel 向队列发送消息

  3) 基本消息队列的消息接收流程:

① 建立 connection

② 创建 channel

③ 利用 channel 声明队列

④ 定义 consumer 的消费行为

    handleDelivery()

⑤ 利用 channel 将消费者与队列

     绑定 

(2) 工作消息队列(WorkQueue)

可以提高消息处理速度,避免队列消

息堆积 

(3) 发布订阅(Publish、Subscribe)

允许将同一消息发送给多个消费者

实现方式是加入了 exchange(交换机)

注意:exchange 负责消息路由,而

      不是存储,路由失败则消息丢失

根据交换机类型不同分为三种:

1) Fanout Exchange:广播

将接收到的消息广播到每一个跟其

绑定的 queue

  

2) Direct Exchange:路由

将接收到的消息根据规则路由到指定

的 Queue,因此称为路由模式(routes)

① 每一个 Queue 都与 Exchange 设置

    一个 BindingKey

② 发布者发送消息时,指定消息的

    RoutingKey

③ Exchange 将消息路由到 BindingKey

    与消息 RoutingKey 一致的队列

  

3) Topic Exchange:主题

TopicExchange 与 DirectExchange 类

似,区别在于 routingKey 必须是多个

单词的列表,并且 . 分割

Queue 与 Exchange 指定 BindingKey 时

可以使用通配符:

#:代指0个或多个单词

*:代指一个单词

三、SpringAMQP

SpringAmqp 的官方地址:

https://spring.io/projects/spring-amqp

AMQP,即 Advanced Message Queuing

Protocol,一个提供统一消息服务的应用

层标准高级消息队列协议,基于此协议的

客户端与消息中间件可传递消息,并不受

客户端/中间件不同产品,不同的开发语言

等条件的限制

Spring AMQP 项目将核心 Spring 概念应

用于基于 AMQP 的消息传递解决方案的

开发的一套 API 规范,它提供了一个模板

作为发送和接收消息的高级抽象

其中 spring-amqp基础抽象

spring-rabbit 是底层的默认实现

1. 利用 SpringAMQP 实现 HelloWorld 中的基

    础消息队列功能

  (1) 在父工程中引入 spring-amqp 的依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

  (2) 在 publisher 服务中利用 RabbitTemplate

      发送消息到 simple.queue 这个队列

   ① 在 publisher 服务中编写 application.yml,

       添加 mq 连接信息:

spring:rabbitmq:host: 192.168.150.110 # 主机名port: 5672 # 端口virtual-host: / # 虚拟主机username: rootpassword: 123456

  ② 在 publisher 服务中新建一个测试类,

       编写测试方法:

public class PublisherTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSimpleQueue() {String queueName = "simple.queue";String message = "hello, spring amqp";rabbitTemplate.convertAndSend(queueName, message);}
}

(3) 在 consumer 服务中编写消费逻辑,绑定

    simple.queue 这个队列 

  ① 在 consumer 服务中编写 application.yml,

      添加 mq 连接信息:

spring:rabbitmq:host: 192.168.150.110 # 主机名port: 5672 # 端口virtual-host: / # 虚拟主机username: rootpassword: 123456

  ② 在 consumer 服务中新建一个类,编写消

      费逻辑: 

@Component
public class SpringRabbitListener {@RabbitListener(queues = {"simple.queue"})public void listenSimpleQueue(String msg) {System.out.println(msg);}
}

注意:消息一旦消费就会从队列删除,

          RabbitMQ 没有消息回溯功能

2. 使用 WorkQueue 实现一个队列绑定多个

    消费者

  (1) 生产者循环发送消息到 simple.queue

@Test
public void testSimpleQueue() throws InterruptedException {String queueName = "simple.queue";String message = "hello, spring amqp";for (int i = 0; i < 50; i++) {rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}
}

  (2) 编写两个消费者,都监听 simple.queue

@Component
public class SpringRabbitListener {@RabbitListener(queues = {"simple.queue"})public void listenSimpleQueue1(String msg) throws InterruptedException {System.out.println("消费者1" + "【" + msg + "】" + LocalTime.now());Thread.sleep(20);}@RabbitListener(queues = {"simple.queue"})public void listenSimpleQueue2(String msg) throws InterruptedException {System.err.println("消费者2" + "【" + msg + "】" + LocalTime.now());Thread.sleep(200);}
}

AMQP 有一个消息预取机制,设置 preFetch

这个值,可以控制预取消息的上限:

spring:rabbitmq:host: 190.92.246.107 # 主机名port: 5672 # 端口virtual-host: / # 虚拟主机username: rootpassword: 123456listener:simple:prefetch: 1

3. FanoutExchange 的使用

(1) 在 consumer 服务中,利用代码声明队

     列、交换机,并将两者绑定

@Configuration
public class FanoutConfig {@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("root.fanout");}@Beanpublic Queue fanoutQueue1() {return new Queue("fanout.queue1");}@Beanpublic Queue fanoutQueue2() {return new Queue("fanout.queue2");}@Beanpublic Binding bindQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}@Beanpublic Binding bindQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}

(2) 在 consumer 服务中,编写两个消费者

     方法,分别监听 fanout.queue1 和

     fanout.queue2

@RabbitListener(queues = {"fanout.queue1"})
public void listenFanoutQueue1(String msg) throws InterruptedException {System.out.println("fanout.queue1消费者" + "【" + msg + "】" + LocalTime.now());
}@RabbitListener(queues = {"fanout.queue2"})
public void listenFanoutQueue2(String msg) throws InterruptedException {System.err.println("fanout.queue2消费者" + "【" + msg + "】" + LocalTime.now());
}

(3) 在 publisher 中编写测试方法,向

     itcast.fanout 发送消息

@Test
public void testSendFanoutExchange() {String exchangeName = "root.fanout";String message = "hello everyone";rabbitTemplate.convertAndSend(exchangeName, "", message);
}

交换机的作用:

① 接收 publisher 发送的消息

② 将消息按照规则路由到与之绑定的

    队列

③ 不能缓存消息,路由失败,消息丢

    失

④ FanoutExchange 的会将消息路由

    到每个绑定的队列

声明队列、交换机、绑定关系的 Bean:

Queue

FanoutExchange

Binding

4. DirectExchange 的使用 

(1) 利用 @RabbitListener 声明 Exchange、

     Queue、RoutingKey

(2) 在 consumer 服务中,编写两个消费者

     方法,分别监听 direct.queue1 和

     direct.queue2

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "root.direct", type = ExchangeTypes.DIRECT),key = {"blue","red"}
))
public void listenDirectQueue1(String msg) {System.err.println("direct.queue1消费者" + "【" + msg + "】" + LocalTime.now());}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "root.direct", type = ExchangeTypes.DIRECT),key = {"yellow","red"}
))
public void listenDirectQueue2(String msg) {System.err.println("direct.queue2消费者" + "【" + msg + "】" + LocalTime.now());}

(3) 在 publisher 中编写测试方法,向 itcast.

     direct发送消息

@Test
public void testSendDirectExchange() {String exchangeName = "root.direct";String message = "hello red";rabbitTemplate.convertAndSend(exchangeName, "red", message);
}

Direct 交换机与 Fanout 交换机的差异:

① Fanout 交换机将消息路由给每一个

    与之绑定的队列

② Direct 交换机根据 RoutingKey 判断

    路由给哪个队列

③ 如果多个队列具有相同的 RoutingKey,

    则与 Fanout 功能类似

5. TopicExchange 的使用

(1) 利用 @RabbitListene r声明 Exchange、

     Queue、RoutingKey

(2) 在 consumer 服务中,编写两个消费者方

    法,分别监听topic.queue1和topic.queue2

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "root.topic", type = ExchangeTypes.TOPIC),key = {"china.#"}
))
public void listenTopicQueue1(String msg) {System.err.println("topic.queue1消费者" + "【" + msg + "】" + LocalTime.now());
}
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "root.topic", type = ExchangeTypes.TOPIC),key = {"#.news"}
))
public void listenTopicQueue2(String msg) {System.err.println("topic.queue2消费者" + "【" + msg + "】" + LocalTime.now());
}

(3) 在 publisher 中编写测试方法,向 itcast.

     topic 发送消息

@Test
public void testSendTopicExchange() {String exchangeName = "root.topic";String message = "hello world";rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}

6. 消息转换器

在 SpringAMQP 的发送方法中,接收消息

的类型是 Object,也就是说我们可以发送

任意对象类型的消息,SpringAMQP 会帮

我们序列化为字节后发送

SpringAMQP 中消息的序列化和反序列化

是利用 MessageConverter 实现的,默认

是 JDK 的序列化,其中发送方与接收方必

须使用相同的 MessageConverter

推荐用 JSON 方式序列化:

① 先在 publisher 服务引入依赖

<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId>
</dependency>

② 在 publisher 服务声明 MessageConverter

@Bean
public MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();
}

③ 在 consumer 服务引入 Jackson 依赖:

<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId>
</dependency>

④ 在 consumer 服务定义 MessageConverter:

@Bean
public MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();
}

⑤ 定义一个消费者,监听 object.queue 队列

    并消费消息:

@RabbitListener(queues = "object.queue")
public void listenObjectQueue(Map<String, Object> msg) {System.out.println("收到消息:【" + msg + "】");
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/13462.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

IDEA+spring boot+mybatis+spring mvc+bootstrap+Mysql停车位管理系统源码

IDEAJavaSpring BootMyBatisshiroLayuiMysql停车位管理系统源码 一、系统介绍1.环境配置 二、系统展示1.登录2.注册3.个人信息4.修改密码5.我的订单6.我的留言7.查车位8.管理员登录9.公告列表10.车位列表11. 订单列表12. 积分排行13. 留言列表14.管理员列表15. 用户列表16.修改…

当型循环和直到型循环(精讲)

目录 背景概念当型循环直到型循环 二维表对比图示与代码当型循环流程图N-S图&#xff08;盒图&#xff09; 直到型循环流程图N-S图&#xff08;盒图&#xff09; 例子当型图示代码 直到型图示代码 Do–Loop 和For –Next相同点&#xff1a;不同点&#xff1a;代码 总结 背景 两…

Java的数据结构

目录 数据结构: 1,数组 2,链表 3,哈希表 4,队列 5,堆 6,栈 7,树 8,图 数据结构: 1,数组 优点: 查找元素的速度很快; 按照索引来遍历数组的速度也很快。 缺点: 数组大小无法改变,一旦创建就无法扩容; 数组只能存储一种数据类型的数据; 插入、修改、删除时比较麻烦&…

第11章:C语言数据结构与算法初阶之排序

系列文章目录 文章目录 系列文章目录前言排序的概念及其运用排序的概念常见的排序算法 常见排序算法的实现1.直接插入排序2. 希尔排序&#xff08;缩小增量排序&#xff09;3. 直接选择排序4. 堆排序5. 冒泡排序6. 快速排序将区间按照基准值划分为左右两半部分的常见方式&#…

<Linux开发>驱动开发 -之- Linux I2C 驱动

&#xff1c;Linux开发&#xff1e;驱动开发 -之- Linux I2C 驱动 交叉编译环境搭建&#xff1a; &#xff1c;Linux开发&#xff1e; linux开发工具-之-交叉编译环境搭建 uboot移植可参考以下&#xff1a; &#xff1c;Linux开发&#xff1e; -之-系统移植 uboot移植过程详细…

flutter聊天界面-加号【➕】更多展开相机、相册等操作Panel

flutter聊天界面-加号【➕】更多展开相机、相册等操作Panel 在之前实现了flutter聊天界面的自定义表情的展示&#xff0c;这里记录一下更多操作展开的相机、相册等操作功能实现。 一、查看效果 更多操作展开的相机、相册等操作功能实现。 二、代码实现 展开的操作按钮可能比…

嵌入式基础知识-总线带宽

带宽&#xff0c;最容易想到的是上网用的网络带宽&#xff0c;在嵌入式软件开发中&#xff0c;也会用到带宽&#xff0c;这个带宽的含义就不一样了&#xff0c;区别是什么&#xff1f;本篇就来介绍一下&#xff0c;并通过一些例子来进行带宽的计算。 先来简单看下不同领域的带…

第二节 给SpringBootAdmin的server端加入spring security安全控制

前言 本来想用一节就写完SpringBootAdmin的&#xff0c;但随着研究的深入发现一节应该是不够的&#xff0c;网上的资料也不会非常系统&#xff0c;官网的例子有些已经好几年没更新了&#xff0c;所以接下来还是系统性的来写下吧 第一节 完成基础配置&#xff0c;暴露所有端点…

有AI助手帮你,刷任何题都不吃力!

Rider如何配置AI助手&#xff1f; 前言&#xff1a;一、选择AI助手二、如何在Rider中配置CodeWhisperer三、使用方法四、分享你的AI解决方案 明明自觉学会了不少知识&#xff0c;可真正开始做题时&#xff0c;却还是出现了“一支笔&#xff0c;一双手&#xff0c;一道力扣&…

html案例2

效果 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"viewport" content"widthdevice-width, initia…

计算机网络 day4 IP地址的两部分-A、B、C、D、E五类IP地址-私有地址-子网掩码-DNA服务器-域名解析服务

目录 三创网络拓扑结构图&#xff1a; 普通家庭网络拓扑结构图&#xff1a;&#xff08;也可以直接使用 子母路由器 &#xff08;母&#xff1a;无线路由器&#xff09;&#xff08;子&#xff1a;信号放大器、中继器&#xff09;&#xff09; 网络层&#xff1a;&#xff0…

用颜色表示数据的第三个维度

横纵坐标显示时间和空间后&#xff0c;第三个数据的特征有时就不好表示了&#xff0c;3d图有的时候看起来更复杂。对于某些情况&#xff0c;用颜色来表示更加简洁。 这里展示的效果图有点像烟花&#xff0c;所以选了这张&#xff0c;但是换其他的cmap才能使得数据展示更加直观 …