1.拉取镜像
docker pull rabbitmq:3.9.15-management
2.运行容器
docker run -d --hostname rabbit1 --name myrabbit1 -p 15672:15672 -p 5672:5672 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:3.9.15-management
3.访问地址
安装ip加端口号
http://192.168.123.3:15672/
客户端如下:
登录账号密码:
username:guest
password:guest
4.新增用户
创建管理员账号:
admin
admin
点击add user保存
5.新增虚拟空间
名字要以/开头
/mqname1
创建成功
查看是否授予权限
授权给guest用户权限,根据自己需要授权
授权成功
6.原生RabbitMq代码实现
加入依赖
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.6.0</version></dependency>
package com.mq.pruducer;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** @Author: 简单模式生产者* @Date: 2024/01/29/15:16* @Description: good good study,day day up*/
public class SimpleProducer {/*** 简单模式消息的生产者发送消息* @param args*/public static void main(String[] args) throws Exception{//创建连接工厂对象ConnectionFactory connectionFactory = new ConnectionFactory();//设置RabbitMQ服务主机地址connectionFactory.setHost("192.168.3.123");//设置RabbitMQ服务端口,默认5672connectionFactory.setPort(5672);//设置虚拟主机名字,默认/connectionFactory.setVirtualHost("/mqname1");//设置用户连接名,默认guestconnectionFactory.setUsername("admin");//设置连接密码,默认guestconnectionFactory.setPassword("admin");//创建连接Connection connection = connectionFactory.newConnection();//创建频道Channel channel = connection.createChannel();//声明队列/*** 1.队列的名字* 2.持久化* 3.是否独占队列,ture:只有这个对象可以操作这个队列,其他的对象如果要操作,只能等这个队列操作结束,相当于加锁* 4.在本次连接释放以后,是否删除队列---类似数据库临时表* 5.队列的附加属性*/channel.queueDeclare("simple_queue", true, false, false, null);for (int i = 0; i < 10; i++) {//创建消息String message = "这是RabbitMQ的第" + i + "条消息!";//消息发送/*** 1.交换机* 2.routingkey是什么:简单模式下和队列的名字保持一致* 3.消息的附加属性是什么* 4.消息的内容是什么*/channel.basicPublish("","simple_queue", null, message.getBytes());//关闭资源}channel.close();connection.close();}
}
查看发送消息
发送了10条消息
消费消息
package com.mq.consumer;import com.rabbitmq.client.*;import java.io.IOException;
import java.io.UnsupportedEncodingException;/*** @Author: 简单模式消息消费者* @Date: 2024/01/29/15:31* @Description: good good study,day day up*/
public class SimpleConsumer {/*** 简单模式消息消费者接受消息* @param args*/public static void main(String[] args) throws Exception{//创建连接工厂对象ConnectionFactory connectionFactory = new ConnectionFactory();//设置RabbitMQ服务主机地址,默认localhostconnectionFactory.setHost("192.168.3.123");//设置RabbitMQ服务端口,默认5672connectionFactory.setPort(5672);//设置虚拟主机名字,默认/connectionFactory.setVirtualHost("/mqname1");//设置用户连接名,默认guestconnectionFactory.setUsername("admin");//设置连接密码,默认guestconnectionFactory.setPassword("admin");//创建连接Connection connection = connectionFactory.newConnection();//创建频道Channel channel = connection.createChannel();//声明队列/*** 1.队列的名字* 2.持久化* 3.是否独占队列* 4.在本次连接释放以后,是否删除队列---临时表* 5.队列的附加属性*/channel.queueDeclare("simple_queue", true, false, false, null);//创建消费者,并设置消息处理:自定义的操作DefaultConsumer defaultConsumer = new DefaultConsumer(channel){/*** 真实自定义处理消息的逻辑* @param consumerTag:消息的标签* @param envelope:消息的属性:消息属于哪个交换机发来的, 消息数据哪个队列=消息routingkey是什么,消息的编号* @param properties* @param body:消息的内容*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException, UnsupportedEncodingException {String s = new String(body, "UTF-8");System.out.println("收到的消息的内容为:" + s);long deliveryTag = envelope.getDeliveryTag();//消息的编号String exchange = envelope.getExchange();//交换机的信息String routingKey = envelope.getRoutingKey();//routingKey的信息System.out.println("收到的消息的编号为:" + deliveryTag);System.out.println("收到的消息的所属的为:" + exchange);System.out.println("收到的消息所属的队列为:" + routingKey);//保存消息到数据库}};//消息监听/*** 1.监听队列的名字* 2.是否自动确认消息*/channel.basicConsume("simple_queue", true, defaultConsumer);//关闭资源(不建议关闭,建议一直监听消息)}
}
已经消费
广播模式
package com.mq.pruducer;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** @Author: 广播模式生产者* @Date: 2024/01/29/15:58* @Description: good good study,day day up*/
public class FanoutProducer {/*** 广播模式消息的生产者发送消息* @param args*/public static void main(String[] args) throws Exception{//创建连接工厂对象ConnectionFactory connectionFactory = new ConnectionFactory();//设置RabbitMQ服务主机地址,默认localhostconnectionFactory.setHost("192.168.3.123");//设置RabbitMQ服务端口,默认5672connectionFactory.setPort(5672);//设置虚拟主机名字,默认/connectionFactory.setVirtualHost("/mqname1");//设置用户连接名,默认guestconnectionFactory.setUsername("admin");//设置连接密码,默认guestconnectionFactory.setPassword("admin");//创建连接Connection connection = connectionFactory.newConnection();//创建频道Channel channel = connection.createChannel();//声明队列/*** 1.队列的名字* 2.持久化* 3.是否独占队列* 4.在本次连接释放以后,是否删除队列---临时表* 5.队列的附加属性*/channel.queueDeclare("fanout_queue_1", true, false, false, null);channel.queueDeclare("fanout_queue_2", true, false, false, null);//声明交换机/*** 1.交换机的名字* 2.交换机的类型*/channel.exchangeDeclare("fanout_exchange", BuiltinExchangeType.FANOUT);//绑定/*** 1.队列* 2.交换机* 3.routingkey*/channel.queueBind("fanout_queue_1", "fanout_exchange", "");channel.queueBind("fanout_queue_2", "fanout_exchange", "");for (int i = 0; i < 10; i++) {//创建消息String message = "这是广播模式的第" + i + "条消息!";//消息发送/*** 1.交换机* 2.routingkey是什么:简单模式下和队列的名字保持一致* 3.消息的附加属性是什么* 4.消息的内容是什么*/if(i % 3 == 0){channel.basicPublish("fanout_exchange","", null, message.getBytes());}else{channel.basicPublish("fanout_exchange","", null, message.getBytes());}//关闭资源}channel.close();connection.close();}
}
消费者
package com.mq.consumer;import com.rabbitmq.client.*;import java.io.IOException;/*** @Author: 广播模式消费者1* @Date: 2024/01/29/16:03* @Description: good good study,day day up*/
public class FanoutConsumer1 {/*** 广播模式消息消费者接受消息* @param args*/public static void main(String[] args) throws Exception{//创建连接工厂对象ConnectionFactory connectionFactory = new ConnectionFactory();//设置RabbitMQ服务主机地址,默认localhostconnectionFactory.setHost("192.168.3.123");//设置RabbitMQ服务端口,默认5672connectionFactory.setPort(5672);//设置虚拟主机名字,默认/connectionFactory.setVirtualHost("/mqname1");//设置用户连接名,默认guestconnectionFactory.setUsername("admin");//设置连接密码,默认guestconnectionFactory.setPassword("admin");//创建连接Connection connection = connectionFactory.newConnection();//创建频道Channel channel = connection.createChannel();//声明队列/*** 1.队列的名字* 2.持久化* 3.是否独占队列* 4.在本次连接释放以后,是否删除队列---临时表* 5.队列的附加属性*/channel.queueDeclare("fanout_queue_1", true, false, false, null);//创建消费者,并设置消息处理:自定义的操作DefaultConsumer defaultConsumer = new DefaultConsumer(channel){/*** 真实自定义处理消息的逻辑* @param consumerTag:消息的标签* @param envelope:消息的属性:消息属于哪个交换机发来的, 消息数据哪个队列=消息routingkey是什么,消息的编号* @param properties* @param body:消息的内容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body, "UTF-8");System.out.println("收到的消息的内容为:" + s);long deliveryTag = envelope.getDeliveryTag();//消息的编号String exchange = envelope.getExchange();//交换机的信息String routingKey = envelope.getRoutingKey();//routingKey的信息System.out.println("收到的消息的编号为:" + deliveryTag);System.out.println("收到的消息的所属的为:" + exchange);System.out.println("收到的消息所属的队列为:" + routingKey);//保存消息到数据库}};//消息监听/*** 1.监听队列的名字* 2.是否自动确认消息*/channel.basicConsume("fanout_queue_1", true, defaultConsumer);//关闭资源(不建议关闭,建议一直监听消息)}
}
package com.mq.consumer;import com.rabbitmq.client.*;import java.io.IOException;/*** @Author: 广播模式消费者2* @Date: 2024/01/29/16:03* @Description: good good study,day day up*/
public class FanoutConsumer2 {/*** 广播模式消息消费者接受消息* @param args*/public static void main(String[] args) throws Exception{//创建连接工厂对象ConnectionFactory connectionFactory = new ConnectionFactory();//设置RabbitMQ服务主机地址,默认localhostconnectionFactory.setHost("192.168.3.123");//设置RabbitMQ服务端口,默认5672connectionFactory.setPort(5672);//设置虚拟主机名字,默认/connectionFactory.setVirtualHost("/mqname1");//设置用户连接名,默认guestconnectionFactory.setUsername("admin");//设置连接密码,默认guestconnectionFactory.setPassword("admin");//创建连接Connection connection = connectionFactory.newConnection();//创建频道Channel channel = connection.createChannel();//声明队列/*** 1.队列的名字* 2.持久化* 3.是否独占队列* 4.在本次连接释放以后,是否删除队列---临时表* 5.队列的附加属性*/channel.queueDeclare("fanout_queue_2", true, false, false, null);//创建消费者,并设置消息处理:自定义的操作DefaultConsumer defaultConsumer = new DefaultConsumer(channel){/*** 真实自定义处理消息的逻辑* @param consumerTag:消息的标签* @param envelope:消息的属性:消息属于哪个交换机发来的, 消息数据哪个队列=消息routingkey是什么,消息的编号* @param properties* @param body:消息的内容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body, "UTF-8");System.out.println("收到的消息的内容为:" + s);long deliveryTag = envelope.getDeliveryTag();//消息的编号String exchange = envelope.getExchange();//交换机的信息String routingKey = envelope.getRoutingKey();//routingKey的信息System.out.println("收到的消息的编号为:" + deliveryTag);System.out.println("收到的消息的所属的为:" + exchange);System.out.println("收到的消息所属的队列为:" + routingKey);//保存消息到数据库}};//消息监听/*** 1.监听队列的名字* 2.是否自动确认消息*/channel.basicConsume("fanout_queue_2", true, defaultConsumer);//关闭资源(不建议关闭,建议一直监听消息)}
}
广播模式队列
广播模式交换机
7.springboot整合RabbitMQ
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.3.4.RELEASE</version></dependency>
server:port: 19012
spring:rabbitmq:host: 192.168.3.123port: 5672virtual-host: /mqname1username: adminpassword: admin
配置类
package com.mq.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @Author: 配置类* @Date: 2024/01/29/16:37* @Description: good good study,day day up*/
@Configuration
public class RabbitMQConfig {//创建队列@Bean("myQueue")public Queue myQueue(){return QueueBuilder.durable("springboot_queue").build();}//创建交换机@Bean("myExchange")public Exchange myExchange(){return ExchangeBuilder.topicExchange("springboot_exchange").build();}//创建绑定@Beanpublic Binding myBinding(@Qualifier("myQueue") Queue myQueue,@Qualifier("myExchange") Exchange myExchange){return BindingBuilder.bind(myQueue).to(myExchange).with("user.#").noargs();}
}
发送消息测试
package com.mq.controller;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @Author: 测试类* @Date: 2024/01/29/13:36* @Description: good good study,day day up*/
@RestController
@RequestMapping("/test")
public class TestController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/one")public String one(){rabbitTemplate.convertAndSend("springboot_exchange","user.insert","1新增类型的消息");rabbitTemplate.convertAndSend("springboot_exchange","user.update","2修改类型的消息");rabbitTemplate.convertAndSend("springboot_exchange","user.delete","3删除类型的消息");return "发送成功";}}
新建一个监听服务
package com.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @Author: 监听类mq* @Date: 2024/01/29/17:19* @Description: good good study,day day up*/
@Component
public class MessageListener {/*** 监听某个队列的消息* @param message 接收到的消息*/@RabbitListener(queues = "springboot_queue")public void myListener1(String message){System.out.println("消费者接收到的消息为:" + message);}
}