参考:BV15k4y1k7Ep
RabbitMQ 相关概念及简述中简单介绍了 RabbitMQ 提供的 6 种工作模式。下面以简单模式为例,介绍 RabbitMQ 的使用。
新建工程
先新建 Maven 工程 RabbitMQ 作为父工程,在父工程下新建三个子模块:
- common:公共包
- producer:生产者
- consumer:消费者
在三个模块中添加 amqp-client 依赖:
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.6.0</version>
</dependency>
在 producer 和 consumer 中添加 common 依赖:
<dependency><groupId>com.zhangmingge.rabbitmq</groupId><artifactId>common</artifactId><version>1.0-SNAPSHOT</version>
</dependency>
编写 common
在 common 中添加用于获取 connection 的工具类,后面 producer 和 consumer 都会用到:
package com.zhangmingge.rabbitmq;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class ConnectionUtil {public static Connection getConnection() throws Exception {// 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 主机地址;默认为 localhostconnectionFactory.setHost("192.168.88.128");// 连接端口;默认为 5672connectionFactory.setPort(5672);// 虚拟主机名称;默认为 /connectionFactory.setVirtualHost("/vhost");// 连接用户名;默认为 guestconnectionFactory.setUsername("admin");// 连接密码;默认为 guestconnectionFactory.setPassword("123456");// 创建连接return connectionFactory.newConnection();}}
编写 producer
package com.zhangmingge.rabbitmq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Producer {static final String QUEUE_NAME = "simple_queue";public static void main(String[] args) throws Exception {// 创建连接Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();// 声明(创建)队列/** 参数 1:队列名称* 参数 2:是否定义持久化队列* 参数 3:是否独占本次连接* 参数 4:是否在不使用的时候自动删除队列* 参数 5:队列其它参数*/channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 要发送的信息String message = "你好;小兔子!";/** 参数 1:交换机名称,如果没有指定则使用默认 Default Exchange* 参数 2:路由 key,简单模式可以传递队列名称* 参数 3:消息其它属性* 参数 4:消息内容*/channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("已发送消息:" + message);// 关闭资源channel.close();connection.close();}
}
在执行上述的消息发送之后,登录 RabbitMQ 的管理控制台,可以发现队列和其中的消息:
编写 consumer
package com.zhangmingge.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException;public class Consumer {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();// 声明(创建)队列/** 参数 1:队列名称* 参数 2:是否定义持久化队列* 参数 3:是否独占本次连接* 参数 4:是否在不使用的时候自动删除队列* 参数 5:队列其它参数*/channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);// 创建消费者:并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel) {@Override/** consumerTag 消息者标签,在 channel.basicConsume 时候可以指定* envelope 消息包的内容,可从中获取消息 id,消息 routingKey,交换机,消息和重传标志 (收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {// 路由 keySystem.out.println("路由 key 为:" + envelope.getRoutingKey());// 交换机System.out.println("交换机为:" + envelope.getExchange());// 消息 idSystem.out.println("消息 id 为:" + envelope.getDeliveryTag());// 收到的消息System.out.println("接收到的消息为:" + new String(body, "utf-8"));}};// 监听消息/** 参数 1:队列名称* 参数 2:是否自动确认,设置为 true 为表示消息接收到自动向 mq 回复接收到了,mq 接收到回复会删除消息,设置为 false 则需要手动确认* 参数 3:消息接收到后回调*/channel.basicConsume(Producer.QUEUE_NAME, true, consumer);// 不关闭资源,应该一直监听消息// channel.close();// connection.close();}
}
运行 consumer 后,可以看到 consumer 打印的日志消息,每运行一次 producer,consumer 就会对应打印一次消息。
小结
上述的入门案例中使用的是如下的简单模式:
在上图的模型中,有以下概念:
- P:生产者,也就是要发送消息的程序。
- C:消费者:消息的接受者,会一直等待消息到来。
- queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。