RabbitMQ工作模式

news/2024/11/5 21:05:19/文章来源:https://www.cnblogs.com/21CHS/p/18528803

RabbitMQ工作模式

  • RabbitMQ提供了多种工作模式:简单模式,work模式 ,Publish/Subscribe发布与订阅模式,Routing路由模式,Topics主题模式等

image-20240806102313708

官网对应模式介绍:https://www.rabbitmq.com/getstarted.html

1、简单模式

生产者直接发送消息到队列上(虽然没有指明使用交换机,但是rabbitmg使用了默认的交换机),只有一个消费者来消费消息。

image-20241025161948904

缺点:当队列中的消息过多,一个消费者的消费能力有限,容易产生消息的积压。

代码实现

1、生产者代码

package com.chs.mq.test;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest  
public class RabbitMQTest {  // 在简单模式下,没有用到交换机public static final String EXCHANGE_DIRECT = "";// 在简单模式下,消息直接发送到队列,此时生产者端需要把队列名称从路由键参数这里传入public static final String ROUTING_KEY_SIMPLE = "chs.queue.simple";// 注入 RabbitTemplate 执行@Autowired  private RabbitTemplate rabbitTemplate;@Test  public void testSendMessageSimple() {  // 发送消息rabbitTemplate.convertAndSend(  EXCHANGE_DIRECT,   	// 指定交换机名称ROUTING_KEY_SIMPLE, // 指定路由键名称"Hello");   // 消息内容,也就是消息数据本身}  
}

2、消费者=》监听器

  • 使用 @RabbitListener 注解设定要监听的队列名称
  • 消息数据使用和发送端一样的数据类型接收
package com.chs.mq.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class MyMessageListener {@RabbitListener(queues = {"chs.queue.simple"})public void processMessage(String messageContent, Message message, Channel channel) {System.out.println("messageContent = " + messageContent);}}

2、Work queues(工作模式)工作队列模式

Work Queues与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。

为了解决【消息积压】的问题,可以创建多个消费者同时监听一个队列。队列中的消息,就会被多个消费者分滩处理,各占一半

image-20241025162350274

应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度

代码实现

生产者代码

package com.chs.mq.test;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest  
public class RabbitMQTest {  // 在简单模式下,没有用到交换机public static final String EXCHANGE_DIRECT = "";// 在简单模式下,消息直接发送到队列,此时生产者端需要把队列名称从路由键参数这里传入public static final String ROUTING_KEY_SIMPLE = "chs.queue.work";// 注入 RabbitTemplate 执行@Autowired  private RabbitTemplate rabbitTemplate;@Testpublic void testSendMessageWork() {for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,ROUTING_KEY_WORK,"Hello" + i);}}}

image-20241025170544581

消费者代码

package com.chs.listen;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class ConsumerListen {@RabbitListener(queues = {"chs.queue.work"})public void  test01(String messageContent, Message message, Channel channel){System.out.println("messageContent1 = " + messageContent);}@RabbitListener(queues = {"chs.queue.work"})public void  test02(String messageContent, Message message, Channel channel){System.out.println("messageContent2 = " + messageContent);}
}

运行效果

多个消费者同时监听一个队列。队列中的消息,就会被多个消费者分滩处理,各占一半

image-20241025171801556

Exchange(交换机)

image-20241025172227917

· P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)

· C:消费者,消息的接受者,会一直等待消息到来。

· Queue:消息队列,接收消息、缓存消息。

· Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。

Exchange有常见以下3种类型

o Fanout:广播,将消息交给所有绑定到交换机的队列

o Direct:定向,把消息交给符合指定routing key 的队列

o Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

3、Publish/Subscribe发布订阅模式

交换机类型为fanout

image-20241025172340879

发布订阅模式:

1、每个消费者监听自己的队列。

2、生产者发布消息到指定的交换机上,该交换机被绑定了多个队列,交换机就会将当前消息分别转发到各个队列上。

3、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收
到消息。

代码实现

创建交换机

注意:发布订阅模式要求交换机是Fanout类型

image-20241025181841979

创建队列并绑定交换机

image-20241025182101506

image-20241025182133444

此时可以到交换机下查看绑定关系:

image-20241025182201538

生产者代码

package com.chs;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
public class ProduceAppTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void test01(){//指定交换机名称==》将消息发送给交换机rabbitTemplate.convertAndSend("exchange_fanout","","hello");}
}

消费者代码

两个监听器可以写在同一个微服务中,分别监听两个不同队列

package com.chs.listen;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class ConsumerListen {//监听Q1队列@RabbitListener(queues = {"Q1"})public void  test01(String messageContent, Message message, Channel channel){System.out.println("消费者1 = " + messageContent);}//监听Q1队列@RabbitListener(queues = {"Q2"})public void  test02(String messageContent, Message message, Channel channel){System.out.println("消费者2 = " + messageContent);}
}

运行效果

先启动消费者,然后再运行生产者程序发送消息:

image-20241025182936782

交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。

发布订阅模式与工作队列模式的区别:

  • 工作队列模式本质上是绑定默认交换机
  • 发布订阅模式绑定指定交换机
  • 监听同一个队列的消费端程序彼此之间是竞争关系
  • 绑定同一个交换机的多个队列在发布订阅模式下,消息是广播的,每个队列都能接收到消息

4、Routing路由模式

交换机类型为direct

路由模式特点:

队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)

消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。

Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

image-20241025183325497

图解:

· P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。

· X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列

· C1:消费者,其所在队列指定了需要routing key 为 error 的消息

· C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

代码实现

创建交换机

image-20241025183603864

创建队列并绑定交换机

image-20241025183820729

此时可以到交换机下查看绑定关系:

image-20241025183843419

生产者代码

package com.chs;import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
public class ProduceAppTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void test01(){// 指定交换机名称和routing key,并使得交换机将消息=》投递到指定key的队列中rabbitTemplate.convertAndSend("exchange_direct","keys","hello");}
}

消费者代码

package com.chs.listen;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class ConsumerListen {//监听Q1队列@RabbitListener(queues = {"Q1"})public void  test01(String messageContent, Message message, Channel channel){System.out.println("消费者1 = " + messageContent);}//监听Q2队列@RabbitListener(queues = {"Q2"})public void  test02(String messageContent, Message message, Channel channel){System.out.println("消费者2 = " + messageContent);}
}

运行结果

消费者1的key为:keys

image-20241025184553108

5、Topics通配符模式

交换机类型为Topic

Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

通配符规则:

#:匹配零个或多个词

*:匹配不多不少恰好1个词

举例:

item.#:能够匹配item.insert.abc 或者 item.insert

item.*:只能匹配item.insert

image-20241025184750942

img

图解:

  • 红色Queue:绑定的是usa.# ,因此凡是以 usa.开头的routing key 都会被匹配到

  • 黄色Queue:绑定的是#.news ,因此凡是以 .news结尾的 routing key 都会被匹配

代码实现

创建交换机

image-20241025185143704

创建队列并绑定交换机

image-20241025185322381

生产者代码

package com.chs;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
public class ProduceAppTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void test(){//匹配通配符  Q1: *.*  //       	 Q2: #.song          rabbitTemplate.convertAndSend("exchange_topic","chs.song","hello1"); // Q1和Q2都匹配rabbitTemplate.convertAndSend("exchange_topic","song.chs.song","hello2"); // 只匹配Q2}
}

消费者代码

package com.chs.listen;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class ConsumerListen {//监听Q1队列@RabbitListener(queues = {"Q1"})public void  test01(String messageContent, Message message, Channel channel){System.out.println("消费者1 = " + messageContent);}//监听Q2队列@RabbitListener(queues = {"Q2"})public void  test02(String messageContent, Message message, Channel channel){System.out.println("消费者2 = " + messageContent);}
}

运行结果

image-20241025190041274

队列和交换机绑定时,也是需要指定key,这个key中可以使用通配符

模式总结

1、简单模式 HelloWorld

一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)

image-20240806085244893

2、工作队列模式 Work Queue

一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)

image-20240806085305207

3、发布订阅模式 Publish/subscribe

需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列

image-20240806085325073

4、路由模式 Routing

需要设置类型为direct的交换机,交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列

image-20240806085354471

5、通配符模式 Topic

需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列

image-20240806085413115

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/827312.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

RabbitMQ消息幂等性保障

消息幂等性保障幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果消息幂等性保障 乐观锁机制 @Component public class …

WPF Button控件 这里可以点一下

WPF Button控件 这里可以点一下button表示一个 Windows 按钮控件,该控件对 Click 事件做出反应。 可以点一下button,执行程序操作,如:显示对话框,更改显示内容。 button的content属性表示按钮上显示的文字。<StackPanel><!-- 一个按钮控件,太小了,根本就看不到…

WPF Textbox控件 这里可以输入文字

WPF Textbox控件 这里可以输入文字 textbox控件,用于输入文字。如网页上输入账号密码的地方就是文本框。 文本框的text属性可以提示文字,只能包含无格式文本。

蛋白粉?蛋白质

蛋白粉不能用开水冲,但我们摄入的蛋白质却大都经过了烹煮。 为什么蛋白质不怕开水,而蛋白粉怕开水? 这似乎是矛盾的,其实不然。 问题是,很多人并不了解蛋白质的分子结构,不了解蛋白粉的溶解原理。 如果不了解其中的原理,很容易给出错误的解释。咳咳咳~~干货内容一次可…

【考试题解】多校A层冲刺NOIP2024模拟赛18

目录A. 选彩笔(rgb)题目内容部分分正解思路代码B. 兵蚁排序(sort)题目内容部分分75pts正解思路代码C. 人口局DBA(dba)题目内容部分分60pts正解思路代码D. 银行的源起(banking) A. 选彩笔(rgb) 题目内容 有 \(N\) 支彩笔,每支彩笔有 \(R_i,G_i,B_i\) 三个属性。定义两只彩笔 \(…

学习笔记(二十五):ArkUi-栅格布局 (GridRow/GridCol)

概述: 栅格布局是一种通用的辅助定位工具,对移动设备的界面设计有较好的借鉴作用。主要优势包括:提供可循的规律:栅格布局可以为布局提供规律性的结构,解决多尺寸多设备的动态布局问题。通过将页面划分为等宽的列数和行数,可以方便地对页面元素进行定位和排版。统一的定位…

WPF TextBlock控件 显示两段文字

WPF TextBlock控件 显示两段文字 TextBlock控件不止显示两段文字,可显示多行。<StackPanel><!-- 显示多行,LineBreak表示换行 --><TextBlock>盼望着,<LineBreak />盼望着,<LineBreak />东风来了,春天的脚步近了。</TextBlock><!-…

运筹学两阶段法中的人工变量数量问题

运筹学两阶段法中的人工变量数量问题 在运筹学的两阶段法中,为了找到线性规划(LP)问题初始解的可行性,通常需要在约束条件中引入人工变量。以下是我学习课本后对相关内容的总结。1. 人工变量的引入条件等式约束(=):每一个等式约束需要引入一个人工变量。大于等于约束(≥…

第三十四讲:join语句怎么优化?

第三十四讲:join语句怎么优化? 简概:万年不变的开头 ​ 在上一篇文章中,我和你介绍了 join 语句的两种算法,分别是 Index Nested-Loop Join(NLJ) 和 Block Nested-Loop Join(BNL)。我们发现在使用 NLJ 算法的时候,其实效果还是不错的,比通过应用层拆分成多个语句然后再拼…

Cursor使用

Cursor是一款AI 代码编辑器,官网地址为https://www.cursor.com/,直接在官网下载安装即可,基于VS Code二次开发而来,之所以没有采用插件方式,在官方网站上给出的答案是某些功能插件无法实现,产品专注在使用AI来进行编程方面,价格方面还不便宜,Pro单月20刀,企业版单月单…

在 Vue 2 项目中使用 Element UI

在 Vue 2 项目中使用 Element UI 本实验手册将指导你如何在 Vue 2 项目中使用 Element UI 组件库,搭建一个简单的页面。 一、介绍 Element UI Element UI (Element - 网站快速成型工具)是一套基于 Vue 2.0 的桌面端组件库,提供了丰富的、可复用的 UI 组件,可以帮助开发者快…