RabbitMQ官方文档:RabbitMQ Tutorials — RabbitMQ
一、RabbitMQ安装(Linux下)
你可以选择原始的方式安装配置,也可以使用docker进行安装,方便快捷!
1. 安装docker
没有docker的先安装一下docker,我这里示例的是Ubuntu的环境下:
CentOS下安装docker参考: Docker 从入门到精通-CSDN博客
#更新系统软件包列表
sudo apt update#安装必要的软件包以允许apt通过HTTPS使用存储库
sudo apt install apt-transport-https ca-certificates curl software-properties-common#添加Docker的官方GPG密钥
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg#添加Docker的稳定存储库
echo "deb [arch=amd64 signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable" | sudo tee /etc/apt/sources.list.d/docker.list > /dev/null#更新软件包列表以包含Docker存储库
sudo apt update#安装Docker引擎
sudo apt install docker-ce docker-ce-cli containerd.io#验证Docker是否正确安装
sudo docker run hello-world
2. 安装 RabbitMQ
#搜索并拉取RabbitMQ的Docker镜像
sudo docker pull rabbitmq:management#创建一个RabbitMQ容器并启动它
sudo docker run -d --name my-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:management#检查容器的状态
sudo docker ps
解释一下这个命令的参数:
-d
:表示容器将以后台(守护进程)模式运行。--name my-rabbit
:为容器指定一个名称,这里是"my-rabbit",你也可以根据需要更改它。-p 5672:5672
:将主机的5672端口映射到容器的5672端口,这是RabbitMQ的默认端口。-p 15672:15672
:将主机的15672端口映射到容器的15672端口,这是RabbitMQ管理插件的Web界面端口。rabbitmq:management
:指定要使用的RabbitMQ镜像及其标签。
3. 访问RabbitMQ的Web界面
注意:如果是云服务器,要在安全组配置中放行对应的端口。
使用用户名 guest,密码 guest 登录
二、运行Hello World
1. 初始化工程
(1)在本地IDEA中创建一个空工程:
(2)在这个空工程下添加两个子模块:rabbitmq-producer 和 rabbitmq-consumer
(3)添加rabbitmq依赖
分别在两个子模块的pom文件中添加如下内容:
<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.10.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-antrun-plugin</artifactId><version>1.3</version></plugin></plugins></build>
2. 编写生产者代码
package com.edu.chd.producer;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ProducerHelloWorld {public static void main(String[] args) throws IOException, TimeoutException {//1.设置连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置参数factory.setHost("123.249.112.12"); //这里对应的启动了rabbitMQ的云服务器的IPfactory.setPort(5672); //默认的端口号就是5672 不写也行//3.创建连接Connection connection = factory.newConnection();//4.创建channerChannel channel = connection.createChannel();//5.创建队列 Queue/*在 RabbitMQ 中,channel.queueDeclare() 方法用于声明一个队列。该方法接受多个参数,下面是对每个参数的解释:queue(队列名称):指定要声明的队列的名称。durable(持久化):指定队列是否是持久化的。当 RabbitMQ 重新启动时,持久化的队列将被保留下来。如果将该参数设置为 true,则队列将被持久化;如果设置为 false,则队列不会被持久化。注意,这里指的是队列本身的持久化,而不是队列中的消息。exclusive(排他性):指定队列是否是排他的。如果将该参数设置为 true,则该队列只能被当前连接的消费者使用,并且在连接关闭时会自动删除该队列。如果设置为 false,则队列可供多个消费者使用。autoDelete(自动删除):指定队列在不再被使用时是否自动删除。如果将该参数设置为 true,则当队列不再被消费者使用时,将自动删除该队列。如果设置为 false,则队列不会自动删除。arguments(参数):指定队列的其他属性和参数,以键值对的形式提供。这些参数在声明队列时提供了一些灵活性和控制选项,以适应不同的应用场景和需求。*///以下代码声明了一个名为 "hello_world" 的队列,该队列是持久化的、非排他的、不自动删除的channel.queueDeclare("hello_world",true,false,false,null);//6.发送消息String body = "hello world! RabbitMQ!";channel.basicPublish("","hello_world",null,body.getBytes());//7.释放资源channel.close();connection.close();}
}
运行生产者代码,访问RabbitMQ的控制页面:
可以看到已经有了一条消息在队列中
3. 编写消费者代码
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ConsumerHelloWorld {public static void main(String[] args) throws IOException, TimeoutException {//1.设置连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置参数factory.setHost("123.249.112.12");factory.setPort(5672);//3.创建连接Connection connection = factory.newConnection();//4.创建channerChannel channel = connection.createChannel();//5.接收消息//创建了一个消费者对象并实现了 handleDelivery() 方法作为回调方法。当消费者收到消息时,将自动执行该方法。Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费消息内容:" + new String(body));}};/*public String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException {return this.basicConsume(queue, autoAck, "", callback);}参数说明:queue(队列名称):指定要从中消费消息的队列的名称。autoAck(自动确认):指定消费者在接收到消息后是否自动发送确认。如果将该参数设置为 true,消费者在接收到消息后会自动确认,告知 RabbitMQ 该消息已被成功处理。如果设置为 false,则需要手动调用 channel.basicAck() 方法来发送确认。consumerTag(消费者标签):指定消费者的标识符。如果将该参数设置为空字符串 "",RabbitMQ 会自动生成一个唯一的消费者标签。callback(消费者回调):指定一个 Consumer 对象,用于处理接收到的消息。你可以实现自己的 Consumer 接口或使用现有的实现。*/channel.basicConsume("hello_world",true,consumer);//不需要关闭资源,因为关闭了就没法从消息队列获取消息了}
}
运行消费者代码,可以看到控制台的输出:
进入RabbitMQ控制页面查看,可以看到有一个消费者,并且刚才的消息已经被消费了: