一.RabbitMQ的角色分类
1:none:
- 不能访问management plugin
2:management:查看自己相关节点信息
- 列出自己可以通过AMQP登入的虚拟机
- 查看自己的虚拟机节点 virtual hosts的queues,exchanges和bindings信息
- 查看和关闭自己的channels和connections
- 查看有关自己的虚拟机节点virtual hosts的统计信息。包括其他用户在这个节点virtual hosts中的活动信息。
3:Policymaker
- 包含management所有权限
- 查看和创建和删除自己的virtual hosts所属的policies和parameters信息。
4:Monitoring
- 包含management所有权限
- 罗列出所有的virtual hosts,包括不能登录的virtual hosts。
- 查看其他用户的connections和channels信息
- 查看节点级别的数据如clustering和memory使用情况
- 查看所有的virtual hosts的全局统计信息。
5:Administrator
- 最高权限
- 可以创建和删除virtual hosts
- 可以查看,创建和删除users
- 查看创建permisssions
- 关闭所有用户的connections
二.RabbitMQ入门案例 - Simple 简单模式
环境需要
1:jdk1.8
2:构建一个maven工程
3:导入rabbitmq的maven依赖
4:启动rabbitmq-server服务
5:定义生产者
6:定义消费者
7:观察消息的在rabbitmq-server服务中的过程
简单模式
在 RabbitMQ 中,Simple 模式(也称为简单模式或直接模式)是最基础的消息传递模式之一。它适用于一对一的消息传递场景,即一个生产者发送消息到队列,一个消费者从队列中接收并处理消息。生产者 (Producer):
生产者负责创建消息,并将消息发送到指定的队列。它不关心谁会消费这些消息,只是将消息推送到队列中。队列 (Queue):
队列是 RabbitMQ 中的核心组件,用于存储消息。在 Simple 模式下,通常只有一个队列,所有的消息都会被发送到这个队列中。消费者 (Consumer):
消费者从队列中拉取消息并进行处理。在 Simple 模式下,通常只有一个消费者,所有消息都会被这个消费者逐一处理。消息 (Message):
消息是由生产者生成的内容,可以是文本、JSON 数据、二进制数据等。消息会被发送到队列中,等待消费者消费。
简单入门案例
在Idea中新建一个maven项目
导入相关的依赖:Java原生依赖
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.25.0</version></dependency>
rabbitmq和spring同属一个公司开放的产品,所以他们的支持也是非常完善,这也是为什么推荐使用rabbitmq的一个原因。
启动RabbitMQ服务:
systemctl start rabbitmq-server
定义生产者:也就是产生消息的对象,需要把生成的消息交给RabbitMQ的队列中
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;public class Producer {public static void main(String[] args) {//创建连接工厂ConnectionFactory factory = new ConnectionFactory();//设置连接参数factory.setHost("8.137.76.12");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("admin");factory.setPassword("admin");//声明连接和通道Connection connection = null;Channel channel = null;try {//创建连接connection = factory.newConnection("生产者");//创建通道channel = connection.createChannel();//声明队列// 5: 申明队列queue存储消息/** 如果队列不存在,则会创建* Rabbitmq不允许创建两个相同的队列名称,否则会报错。** @params1: queue 队列的名称* @params2: durable 队列是否持久化* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。* */channel.queueDeclare("queue1",false,false,false,null);//发送消息// 7: 发送消息给中间件rabbitmq-server// @params1: 交换机exchange// @params2: 队列名称/routing// @params3: 属性配置// @params4: 发送消息的内容channel.basicPublish("","queue1",null,"hello,i am ok".getBytes());System.out.println("信息发送成功");}catch (Exception e){e.printStackTrace();System.out.println("发送消息异常");}finally {//关闭连接--通道if (channel !=null && channel.isOpen()){try {channel.close();}catch (Exception e){System.out.println("关闭通道异常");}}//关闭连接if (connection != null){try {connection.close();}catch (Exception e){System.out.println("关闭连接失败");}}}} }
执行发送之后,可以使用RabbitMQ的web管理查看通道是否接收到消息:
定义消费者从通道中拿取信息:
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer {public static void main(String[] args) {//创建连接工厂ConnectionFactory factory = new ConnectionFactory();//设置连接参数factory.setHost("8.137.76.12");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("admin");factory.setPassword("admin");//声明连接和通道Connection connection = null;Channel channel = null;try {//创建连接connection = factory.newConnection("消费者");//创建通道channel = connection.createChannel();//接收信息// 监听队列,当有消息到达时,执行回调函数channel.basicConsume("queue1", true, new DeliverCallback() {// 当有消息到达时,执行回调函数 @Overridepublic void handle(String s, Delivery delivery) throws IOException {// 打印接收到的消息System.out.println("接收到的消息是:" + new String(delivery.getBody(), "UTF8"));}}, new CancelCallback() {// 当接收失败时,执行回调函数 @Overridepublic void handle(String s) throws IOException {// 打印接收失败的信息System.out.println("接收失败了");}});System.out.println("开始接收消息");System.in.read();}catch (Exception e){e.printStackTrace();System.out.println("发送消息异常");}finally {//关闭连接--通道if (channel !=null && channel.isOpen()){try {channel.close();}catch (Exception e){System.out.println("关闭通道异常");}}//关闭连接if (connection != null){try {connection.close();}catch (Exception e){System.out.println("关闭连接失败");}}}} }
当消息接收之后,我们暂停程序,也就是不释放连接:
我们发现消息队列中就没有消息了
然后释放连接:
没有持久化的队列消失了
三.什么是AMQP
AMQP,全称为高级消息队列协议(Advanced Message Queuing Protocol),是一种开放的、面向消息中间件的线级协议。它由JPMorgan Chase和iMatix公司设计开发,并于2008年首次发布。AMQP旨在提供一种统一的消息传递协议,使得不同的消息中间件产品之间能够实现互操作性。
AMQP的核心概念
-
Exchange(交换机):接收生产者发送的消息,并根据一定的路由规则将这些消息分发到一个或多个队列中。
-
Queue(队列):存储消息的地方,消费者从这里获取并处理消息。
-
Binding(绑定):定义了Exchange与Queue之间的关系,即通过何种规则将消息从Exchange路由到特定的Queue。
-
Routing Key(路由键):一个虚拟地址,通常是一个简单的字符串,用于Exchange决定如何路由消息。
-
Message(消息):应用程序之间需要通过消息代理(如RabbitMQ)进行传递的数据内容。
AMQP的工作流程
- 生产者(Producer)连接到RabbitMQ服务器,并声明一个交换机(Exchange)。
- 生产者发送消息给交换机,同时指定一个路由键(Routing Key)。
- 交换机根据其类型和绑定规则(Bindings)将消息路由到一个或多个队列(Queue)。
- 消费者(Consumer)监听特定的队列,并处理传入的消息。
生产者:
消费者: