1.简单队列
-
消息生产者
public class Send {private static final String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {// 连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.101.128");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("Gen123");factory.setVirtualHost("/");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "这是一条消息!!!";// 发送消息channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));System.out.println("Send:" + message);}} }
-
消息消费者(会一直监听队列)
public class Recv {private static final String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {// 连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.101.128");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("Gen123");factory.setVirtualHost("/");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 回调DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println("Recv:" + message);};// 自动确认消息channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});} }
2.工作队列
-
工作队列
- 消息生产能力大于消费能力,增加多个消费节点
- 和简单队列类似,增加多个消费节点,处于竞争关系
- 默认策略:round robin轮训
-
生产者
public class Send {private static final String QUEUE_NAME = "work_mq";public static void main(String[] args) throws Exception {// 连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.101.128");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("Gen123");factory.setVirtualHost("/");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 批量发送10个消息for (int i = 0; i < 10; i++) {String message = "这是一条消息!!!" + i;// 发送消息channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));System.out.println("Send:" + message);}}} }
-
消费者1
public class Recv1 {private static final String QUEUE_NAME = "work_mq";public static void main(String[] args) throws Exception {// 连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.101.128");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("Gen123");factory.setVirtualHost("/");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 回调DeliverCallback deliverCallback = (consumerTag, delivery) -> {// 模拟消费者缓慢try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {throw new RuntimeException(e);}String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println("Recv1:" + message);// 手工确认消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};// 关闭自动确认消息channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});} }
-
消费者2
public class Recv2 {private static final String QUEUE_NAME = "work_mq";public static void main(String[] args) throws Exception {// 连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.101.128");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("Gen123");factory.setVirtualHost("/");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 回调DeliverCallback deliverCallback = (consumerTag, delivery) -> {// 模拟消费者缓慢try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println("Recv2:" + message);// 手工确认消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};// 关闭自动确认消息channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});} }
-
轮训策略验证
- 先启动两个消费者,再启动生产者
- 缺点:存在部分节点消费过快,部分节点消费慢,导致不能合理处理消息
-
公平策略验证
- 修改消费者策略
- 解决消费者能力消费不足的问题,降低消费时间问题
3.RabbitMQ的Exchange交换机
- 生产者将消息发送到Exchange,交换机将消息路由到一个或者多个队列中,交换机有多个类型,队列和交换机是多对多的关系
- 交换机只负责转发消息,不具备存储消息的能力,如果没有队列和交换机绑定或者没有符合的路由规则,则消息会被丢失
- RabbitMQ有四种交换机类型,分别是Direct exchange、Fanout exchange、Topic exchange、Headers exchange,最后一种基本不用
- 交换机类型
- Direct exchange定向
- 将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配
- 处理路由键
- Fanout exchange广播
- 只需要简单的将队列绑定到交换机上,一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息
- Fanout交换机转发消息是最快的,用于发布订阅广播形式
- 不处理路由键
- Topic exchange通配符
- 主题交换机是一种发布/订阅的模式,结合了直连交换机与扇形交换机的特点
- 将路由键和某模式进行匹配,此时队列需要绑定在一个模式上
- 符号"#“匹配一个或多个词,符号”*"匹配不多不少一个词
- Headers exchange(很少用)
- 根据发送的消息内容中的headers属性进行匹配,在绑定Queue与Exchange时指定一组键值对
- 当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配
- 如果完全匹配则消息会路由到该队列,否则不会路由到该队列
- 不处理路由键
- Direct exchange定向
4.发布订阅模型
-
什么是RabbitMQ的发布订阅模式
- 发布订阅模型中,消息生产者不再是直接面对队列,而是直面交换机,都需要经过交换机来进行消息的发送,所有发往同一个fanout交换机的消息都会被所有监听这个交换机的消费者接收
- 发布订阅模型引入fanout交换机
-
发布订阅模型应用场景
- 微信公众号
- 新浪微博关注
-
RabbitMQ发布订阅模型
- 通过把消息发送给交换机,交换机转发给对应绑定的队列
- 交换机绑定的队列是排他独占队列,自动删除
-
发送端
public class Send {private static final String EXCHANGE_NAME = "fan_mq";public static void main(String[] args) throws Exception {// 连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.101.128");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("Gen123");factory.setVirtualHost("/");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 绑定交换机,广播类型channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);String message = "广播发送消息:这是一条消息!!!";// 发送消息channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));System.out.println("Send:" + message);}} }
-
消费端(两个节点)
public class Recv1 {private static final String EXCHANGE_NAME = "fan_mq";public static void main(String[] args) throws Exception {// 连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.101.128");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("Gen123");factory.setVirtualHost("/");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 绑定交换机,广播类型channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);// 获取队列(排它队列)String queueName = channel.queueDeclare().getQueue();// 绑定队列和交换机channel.queueBind(queueName, EXCHANGE_NAME, "");// 回调DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println("Recv1:" + message);};// 自动确认消息channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});} }
5.路由模式
-
什么是RabbitMQ的路由模式
-
交换机类型是direct
-
队列和交换机绑定,需要指定一个路由键(也叫binding key)
-
消息生产者发送消息给交换机,需要指定路由键
-
交换机根据消息的路由键,转发给对应的队列
-
-
消息生产者
public class Send {private static final String EXCHANGE_NAME = "direct_mq";public static void main(String[] args) throws Exception {// 连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.101.128");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("Gen123");factory.setVirtualHost("/");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 绑定交换机,直连类型channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);String error = "我是错误日志";String info = "我是info日志";String warning = "我是warning日志";// 发送消息channel.basicPublish(EXCHANGE_NAME, "error", null, error.getBytes(StandardCharsets.UTF_8));channel.basicPublish(EXCHANGE_NAME, "info", null, info.getBytes(StandardCharsets.UTF_8));channel.basicPublish(EXCHANGE_NAME, "warning", null, warning.getBytes(StandardCharsets.UTF_8));System.out.println("Send:消息发送成功!");}} }
-
消费者一(只接收错误消息)
public class Recv1 {private static final String EXCHANGE_NAME = "direct_mq";public static void main(String[] args) throws Exception {// 连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.101.128");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("Gen123");factory.setVirtualHost("/");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 绑定交换机,直连类型channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);// 获取队列String queueName = channel.queueDeclare().getQueue();// 绑定队列和交换机channel.queueBind(queueName, EXCHANGE_NAME, "error");// 回调DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println("Recv1:" + message);};// 自动确认消息channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});} }
-
消费者二(接收全部消息)
public class Recv2 {private static final String EXCHANGE_NAME = "direct_mq";public static void main(String[] args) throws Exception {// 连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.101.128");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("Gen123");factory.setVirtualHost("/");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 绑定交换机,直连类型channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);// 获取队列String queueName = channel.queueDeclare().getQueue();// 绑定队列和交换机channel.queueBind(queueName, EXCHANGE_NAME, "error");channel.queueBind(queueName, EXCHANGE_NAME, "info");channel.queueBind(queueName, EXCHANGE_NAME, "warning");// 回调DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println("Recv2:" + message);};// 自动确认消息channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});} }
6.主题通配符模式
-
什么是RabbitMQ的主题模式
- 交换机是topic,可以实现发布订阅模式fanout和路由模式direct的功能,更加灵活,支持通配符匹配
- 交换机通过通配符进行转发到对应的队列,*代表一个词,#代表1个或多个词,一般用#作为通配符居多,词与词之间使用.点进行分割
- 注意:交换机和队列绑定时用的binding使用通配符的路由键;生产者发送消息时需要使用具体的路由键
-
生产者
public class Send {private static final String EXCHANGE_NAME = "topic_mq";public static void main(String[] args) throws Exception {// 连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.101.128");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("Gen123");factory.setVirtualHost("/");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 绑定交换机,主题类型channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);String error = "我是错误日志";String info = "我是info日志";String warning = "我是warning日志";// 发送消息channel.basicPublish(EXCHANGE_NAME, "error", null, error.getBytes(StandardCharsets.UTF_8));channel.basicPublish(EXCHANGE_NAME, "info", null, info.getBytes(StandardCharsets.UTF_8));channel.basicPublish(EXCHANGE_NAME, "warning", null, warning.getBytes(StandardCharsets.UTF_8));System.out.println("Send:消息发送成功!");}} }
-
消费者一(只接收错误消息)
public class Recv1 {private static final String EXCHANGE_NAME = "topic_mq";public static void main(String[] args) throws Exception {// 连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.101.128");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("Gen123");factory.setVirtualHost("/");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 绑定交换机,主题类型channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);// 获取队列String queueName = channel.queueDeclare().getQueue();// 绑定队列和交换机channel.queueBind(queueName, EXCHANGE_NAME, "error");// 回调DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println("Recv1:" + message);};// 自动确认消息channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});} }
-
消费者二(接收全部消息)
public class Recv2 {private static final String EXCHANGE_NAME = "topic_mq";public static void main(String[] args) throws Exception {// 连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.101.128");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("Gen123");factory.setVirtualHost("/");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 绑定交换机,主题类型channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);// 获取队列String queueName = channel.queueDeclare().getQueue();// 绑定队列和交换机channel.queueBind(queueName, EXCHANGE_NAME, "#");// 回调DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println("Recv2:" + message);};// 自动确认消息channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});} }
7.工作模式总结
-
简单模式
- 一个生产者一个消费者,不用指定交换机,使用默认交换机
-
工作队列模式
- 一个生产者多个消费者,可以有轮训和公平策略,不用指定交换机,使用默认交换机
-
发布订阅模式
- fanout类型交换机,通过交换机和队列绑定,不用指定绑定路由键,生产者发送消息到交换机,fanout交换机直接进行转发,消息不用指定routingkey路由键
-
路由模式
- direct类型交换机,通过交换机和队列绑定,指定绑定的路由键,生产者发送消息到交换机,交换机根据消息的路由key进行转发到对应的队列,消息要指定routingkey路由键
-
通配符模式
- topic交换机,通过交换机和队列绑定,指定绑定的通配符路由键,生产者发送消息到交换机,交换机根据消息的路由键进行转发到对应的队列,消息要指定routingkey路由键