一、RabbitMQ有哪些作用
RabbitMQ是一个消息队列中间件,它的作用是利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行的分布式系统的集成,主要作用有以下方面:
-
实现应用程序之间的异步和解耦:通过使用消息队列,应用程序可以通过读写消息来实现通信,无需直接调用彼此,从而减少了对彼此的直接依赖。
-
提供基本的最终一致性实现:RabbitMQ可以帮助实现面向服务的架构(SOA),通过消息队列来通信,从而实现不同服务之间的解耦,并确保最终一致性。
-
实现消息的缓冲和分发:RabbitMQ可以接收并存储大量消息,直到消费者准备好处理它们,从而避免消费者因为数据量过大而无法及时处理消息的情况。
-
支持分布式系统中的事务支持:通过使用消息队列,可以在分布式系统中实现事务的协调和通信。
-
实现RPC调用:通过使用消息队列,可以在分布式系统中实现RPC调用,从而实现不同服务之间的通信。
-
流量控制:通过使用消息队列,将用户请求写入消息队列,按规则读取请求
总的来说,RabbitMQ可以帮助应用程序之间实现解耦、缓冲和分发消息,提供最终一致性实现,支持分布式系统中的事务支持和RPC调用。
二、请列举一个RabbitMQ异步消息处理案例
假设有一个电商系统,包含多个子系统,如订单系统、支付系统、物流系统等。这些子系统之间需要进行高效的通信和协作,才能完成整个电商交易过程。
其中一个典型的场景是,当用户在电商网站上下单后,订单系统需要异步将订单信息发送给支付系统和物流系统,以便它们能够及时处理用户的支付和物流需求。
在这种情况下,可以使用RabbitMQ来实现异步消息处理。具体流程如下:
- 订单系统创建一个订单后,将订单信息发送到RabbitMQ消息队列中。
- 支付系统和物流系统分别订阅该消息队列,并异步处理订单信息。
- 支付系统收到订单信息后,会进行支付处理,并将支付结果发送到RabbitMQ消息队列中。
- 物流系统收到订单信息后,会进行物流处理,并将物流结果发送到RabbitMQ消息队列中。
- 订单系统异步收到支付系统和物流系统的处理结果后,会将订单状态更新为完成或失败,并返回给用户处理结果。
模拟代码:
import com.rabbitmq.client.*; import java.io.IOException;
import java.util.concurrent.TimeoutException; public class RabbitMQExample { private final static String ORDER_QUEUE_NAME = "orders"; private final static String PAYMENT_QUEUE_NAME = "payments"; private final static String SHIPMENT_QUEUE_NAME = "shipments"; public static void main(String[] args) throws IOException, TimeoutException { // 创建连接工厂并设置参数 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setUsername("guest"); factory.setPassword("guest"); // 创建连接和通道 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(ORDER_QUEUE_NAME, false, false, false, null); channel.queueDeclare(PAYMENT_QUEUE_NAME, false, false, false, null); channel.queueDeclare(SHIPMENT_QUEUE_NAME, false, false, false, null); // 发送订单消息 String orderMessage = "New order created"; channel.basicPublish("", ORDER_QUEUE_NAME, null, orderMessage.getBytes("UTF-8")); System.out.println(" [x] Sent order message: " + orderMessage); // 发送支付消息 String paymentMessage = "Payment received"; channel.basicPublish("", PAYMENT_QUEUE_NAME, null, paymentMessage.getBytes("UTF-8")); System.out.println(" [x] Sent payment message: " + paymentMessage); // 发送物流消息 String shipmentMessage = "Shipment sent"; channel.basicPublish("", SHIPMENT_QUEUE_NAME, null, shipmentMessage.getBytes("UTF-8")); System.out.println(" [x] Sent shipment message: " + shipmentMessage); // 关闭连接和通道 channel.close(); connection.close(); } // 订单处理函数 public static void processOrder(String message) { System.out.println(" [x] Processing order: " + message); } // 支付处理函数 public static void processPayment(String message) { System.out.println(" [x] Processing payment: " + message); } // 物流处理函数 public static void processShipment(String message) { System.out.println(" [x] Processing shipment: " + message); } // 订阅消息并异步处理 public static void subscribeToMessages(String queueName) throws IOException, TimeoutException { // 创建连接工厂并设置参数 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setUsername("guest"); factory.setPassword("guest"); // 创建连接和通道 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(queueName, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 创建消费者并订阅队列 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); if (queueName.equals(ORDER_QUEUE_NAME)) { processOrder(message); } else if (queueName.equals(PAYMENT_QUEUE_NAME)) { processPayment(message); } else if (queueName.equals(SHIPMENT_QUEUE_NAME)) { processShipment(message); } } }; channel.basicConsume(queueName, true, consumer); }
}
三、Sprngboot如何使用RabbitMQ发送消息
代码示例:
- 引入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 配置RabbitMQ连接信息
在application.properties文件中添加以下配置:
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
- 创建消息生产者
@Service
public class MessageProducer { @Autowiredprivate RabbitTemplate rabbitTemplate; public void sendMessage(String message) {rabbitTemplate.convertAndSend("myQueue", message);}
}
- 创建消息消费者
@Service
public class MessageConsumer implements MessageListener { @Overridepublic void onMessage(Message message) throws Exception {System.out.println("Received message: " + new String((byte[]) message.getBody()));}
}
- 在启动类上添加@EnableRabbit注解,启用RabbitMQ自动配置。同时,将消息消费者注册到RabbitMQ队列中。
@SpringBootApplication
@EnableRabbit //启用RabbitMQ自动配置
public class Application implements CommandLineRunner { @Autowiredprivate MessageProducer messageProducer; public static void main(String[] args) throws Exception {SpringApplication.run(Application.class, args);}
}
四、RabbitMQ消息队列主要有多少种形式的目的地?
RabbitMQ消息队列主要有两种形式的目的地,分别是队列(Queue)和主题(Topic)。
-
队列是点对点的消息容器,它只有一个发送者,但可以由多个消费者消费。发送者将消息发送到队列,消费者从队列中读取消息并进行处理。
-
主题是一种发布/订阅模式的消息容器,它可以包含多个订阅者。发送者将消息发送到主题,订阅者订阅主题并接收消息。多个订阅者可以同时接收同一条消息,但每个订阅者只能接收到符合自己订阅规则的消息。
在RabbitMQ中,队列和主题都是通过exchange来进行路由的。exchange根据指定的路由规则将消息路由到相应的队列中,这个规则可以是基于消息内容的,也可以是基于其他属性的。
五、JMS(Java Message Service)JAVA消息服务与AMQP(Advanced Message Queuing Protocol)消息服务的区别有哪些?
- JMS:基于JVM消息代理的规范。ActiveMQ、HornetMQ是JMS实现
- AMQP:高级消息队列协议,也是一个消息代理的规范,兼容JMS,RabbitMQ是AMQP的实现
具体区别:
JMS(Java Message Service) | AMQP(Advanced Message Queuing Protocol) | |
---|---|---|
定义 | Java api | 网络线级协议 |
跨语言 | 否i | 是 |
跨平台 | 否 | 是 |
Model | 提供两种消息模型: (1)、Peer-2-Peer (2)、Pub/sub | 提供了五种消息模型: (1)、direct exchange (2)、fanout exchange (3)、topic change (4)、headers exchange (5)、system exchange 本质来讲,后四种和JMS的pub/sub模型没有太大差别, |
仅是在路由机制上做了更详细的划分; | ||
支持消息类型 | 多种消息类型: TextMessage MapMessage BytesMessage StreamMessage ObjectMessage Message (只有消息头和属性) | byte[] 当实际应用时,有复杂的消息,可以将消息序列化后发送。 |
综合评价 | JMS 定义了JAVA API层面的标准;在java体系中,多个client均可以通过JMS进行交互,不需要应用修改代码,但是其对跨平台的支持较差; | AMQP定义了wire-level层的协议标准;天然具有跨平、跨语言特性。 |