RabbitMQ工作模式
- RabbitMQ提供了多种工作模式:简单模式,work模式 ,Publish/Subscribe发布与订阅模式,Routing路由模式,Topics主题模式等
官网对应模式介绍:https://www.rabbitmq.com/getstarted.html
1、简单模式
生产者直接发送消息到队列上(虽然没有指明使用交换机,但是rabbitmg使用了默认的交换机),只有一个消费者来消费消息。
缺点:当队列中的消息过多,一个消费者的消费能力有限,容易产生消息的积压。
代码实现
1、生产者代码
package com.chs.mq.test;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class RabbitMQTest { // 在简单模式下,没有用到交换机public static final String EXCHANGE_DIRECT = "";// 在简单模式下,消息直接发送到队列,此时生产者端需要把队列名称从路由键参数这里传入public static final String ROUTING_KEY_SIMPLE = "chs.queue.simple";// 注入 RabbitTemplate 执行@Autowired private RabbitTemplate rabbitTemplate;@Test public void testSendMessageSimple() { // 发送消息rabbitTemplate.convertAndSend( EXCHANGE_DIRECT, // 指定交换机名称ROUTING_KEY_SIMPLE, // 指定路由键名称"Hello"); // 消息内容,也就是消息数据本身}
}
2、消费者=》监听器
- 使用 @RabbitListener 注解设定要监听的队列名称
- 消息数据使用和发送端一样的数据类型接收
package com.chs.mq.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class MyMessageListener {@RabbitListener(queues = {"chs.queue.simple"})public void processMessage(String messageContent, Message message, Channel channel) {System.out.println("messageContent = " + messageContent);}}
2、Work queues(工作模式)工作队列模式
Work Queues与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
为了解决【消息积压】的问题,可以创建多个消费者同时监听一个队列。队列中的消息,就会被多个消费者分滩处理,各占一半
应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度
代码实现
生产者代码
package com.chs.mq.test;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class RabbitMQTest { // 在简单模式下,没有用到交换机public static final String EXCHANGE_DIRECT = "";// 在简单模式下,消息直接发送到队列,此时生产者端需要把队列名称从路由键参数这里传入public static final String ROUTING_KEY_SIMPLE = "chs.queue.work";// 注入 RabbitTemplate 执行@Autowired private RabbitTemplate rabbitTemplate;@Testpublic void testSendMessageWork() {for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,ROUTING_KEY_WORK,"Hello" + i);}}}
消费者代码
package com.chs.listen;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class ConsumerListen {@RabbitListener(queues = {"chs.queue.work"})public void test01(String messageContent, Message message, Channel channel){System.out.println("messageContent1 = " + messageContent);}@RabbitListener(queues = {"chs.queue.work"})public void test02(String messageContent, Message message, Channel channel){System.out.println("messageContent2 = " + messageContent);}
}
运行效果
多个消费者同时监听一个队列。队列中的消息,就会被多个消费者分滩处理,各占一半
Exchange(交换机)
· P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
· C:消费者,消息的接受者,会一直等待消息到来。
· Queue:消息队列,接收消息、缓存消息。
· Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
Exchange有常见以下3种类型:
o Fanout:广播,将消息交给所有绑定到交换机的队列
o Direct:定向,把消息交给符合指定routing key 的队列
o Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
3、Publish/Subscribe发布订阅模式
交换机类型为fanout
发布订阅模式:
1、每个消费者监听自己的队列。
2、生产者发布消息到指定的交换机上,该交换机被绑定了多个队列,交换机就会将当前消息分别转发到各个队列上。
3、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收
到消息。
代码实现
创建交换机
注意:发布订阅模式要求交换机是Fanout类型
创建队列并绑定交换机
此时可以到交换机下查看绑定关系:
生产者代码
package com.chs;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
public class ProduceAppTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void test01(){//指定交换机名称==》将消息发送给交换机rabbitTemplate.convertAndSend("exchange_fanout","","hello");}
}
消费者代码
两个监听器可以写在同一个微服务中,分别监听两个不同队列
package com.chs.listen;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class ConsumerListen {//监听Q1队列@RabbitListener(queues = {"Q1"})public void test01(String messageContent, Message message, Channel channel){System.out.println("消费者1 = " + messageContent);}//监听Q1队列@RabbitListener(queues = {"Q2"})public void test02(String messageContent, Message message, Channel channel){System.out.println("消费者2 = " + messageContent);}
}
运行效果
先启动消费者,然后再运行生产者程序发送消息:
交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。
发布订阅模式与工作队列模式的区别:
- 工作队列模式本质上是绑定默认交换机
- 发布订阅模式绑定指定交换机
- 监听同一个队列的消费端程序彼此之间是竞争关系
- 绑定同一个交换机的多个队列在发布订阅模式下,消息是广播的,每个队列都能接收到消息
4、Routing路由模式
交换机类型为direct
路由模式特点:
队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息
图解:
· P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
· X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
· C1:消费者,其所在队列指定了需要routing key 为 error 的消息
· C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
代码实现
创建交换机
创建队列并绑定交换机
此时可以到交换机下查看绑定关系:
生产者代码
package com.chs;import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
public class ProduceAppTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void test01(){// 指定交换机名称和routing key,并使得交换机将消息=》投递到指定key的队列中rabbitTemplate.convertAndSend("exchange_direct","keys","hello");}
}
消费者代码
package com.chs.listen;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class ConsumerListen {//监听Q1队列@RabbitListener(queues = {"Q1"})public void test01(String messageContent, Message message, Channel channel){System.out.println("消费者1 = " + messageContent);}//监听Q2队列@RabbitListener(queues = {"Q2"})public void test02(String messageContent, Message message, Channel channel){System.out.println("消费者2 = " + messageContent);}
}
运行结果
消费者1的key为:keys
5、Topics通配符模式
交换机类型为Topic
Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
#:匹配零个或多个词
*:匹配不多不少恰好1个词
举例:
item.#:能够匹配item.insert.abc 或者 item.insert
item.*:只能匹配item.insert
图解:
-
红色Queue:绑定的是usa.# ,因此凡是以 usa.开头的routing key 都会被匹配到
-
黄色Queue:绑定的是#.news ,因此凡是以 .news结尾的 routing key 都会被匹配
代码实现
创建交换机
创建队列并绑定交换机
生产者代码
package com.chs;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
public class ProduceAppTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void test(){//匹配通配符 Q1: *.* // Q2: #.song rabbitTemplate.convertAndSend("exchange_topic","chs.song","hello1"); // Q1和Q2都匹配rabbitTemplate.convertAndSend("exchange_topic","song.chs.song","hello2"); // 只匹配Q2}
}
消费者代码
package com.chs.listen;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class ConsumerListen {//监听Q1队列@RabbitListener(queues = {"Q1"})public void test01(String messageContent, Message message, Channel channel){System.out.println("消费者1 = " + messageContent);}//监听Q2队列@RabbitListener(queues = {"Q2"})public void test02(String messageContent, Message message, Channel channel){System.out.println("消费者2 = " + messageContent);}
}
运行结果
队列和交换机绑定时,也是需要指定key,这个key中可以使用通配符
模式总结
1、简单模式 HelloWorld
一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)
2、工作队列模式 Work Queue
一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)
3、发布订阅模式 Publish/subscribe
需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列
4、路由模式 Routing
需要设置类型为direct的交换机,交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
5、通配符模式 Topic
需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列