RabbitMQ基本使用,docker安装RabbitMQ,SpringBoot整合RabbitMQ

1.拉取镜像

docker pull rabbitmq:3.9.15-management

2.运行容器

docker run -d --hostname rabbit1 --name myrabbit1 -p 15672:15672 -p 5672:5672 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:3.9.15-management

3.访问地址

安装ip加端口号

http://192.168.123.3:15672/

客户端如下:

在这里插入图片描述
登录账号密码:
username:guest
password:guest

4.新增用户

创建管理员账号:
admin
admin
点击add user保存
在这里插入图片描述

在这里插入图片描述

5.新增虚拟空间

在这里插入图片描述

名字要以/开头
/mqname1
在这里插入图片描述

创建成功
在这里插入图片描述

查看是否授予权限
在这里插入图片描述
授权给guest用户权限,根据自己需要授权
在这里插入图片描述
授权成功
在这里插入图片描述

6.原生RabbitMq代码实现

加入依赖

        <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.6.0</version></dependency>
package com.mq.pruducer;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** @Author: 简单模式生产者* @Date: 2024/01/29/15:16* @Description: good good study,day day up*/
public class SimpleProducer {/*** 简单模式消息的生产者发送消息* @param args*/public static void main(String[] args) throws Exception{//创建连接工厂对象ConnectionFactory connectionFactory = new ConnectionFactory();//设置RabbitMQ服务主机地址connectionFactory.setHost("192.168.3.123");//设置RabbitMQ服务端口,默认5672connectionFactory.setPort(5672);//设置虚拟主机名字,默认/connectionFactory.setVirtualHost("/mqname1");//设置用户连接名,默认guestconnectionFactory.setUsername("admin");//设置连接密码,默认guestconnectionFactory.setPassword("admin");//创建连接Connection connection = connectionFactory.newConnection();//创建频道Channel channel = connection.createChannel();//声明队列/*** 1.队列的名字* 2.持久化* 3.是否独占队列,ture:只有这个对象可以操作这个队列,其他的对象如果要操作,只能等这个队列操作结束,相当于加锁* 4.在本次连接释放以后,是否删除队列---类似数据库临时表* 5.队列的附加属性*/channel.queueDeclare("simple_queue", true, false, false, null);for (int i = 0; i < 10; i++) {//创建消息String message = "这是RabbitMQ的第" + i + "条消息!";//消息发送/*** 1.交换机* 2.routingkey是什么:简单模式下和队列的名字保持一致* 3.消息的附加属性是什么* 4.消息的内容是什么*/channel.basicPublish("","simple_queue", null, message.getBytes());//关闭资源}channel.close();connection.close();}
}

查看发送消息
在这里插入图片描述

在这里插入图片描述
发送了10条消息
在这里插入图片描述

消费消息

package com.mq.consumer;import com.rabbitmq.client.*;import java.io.IOException;
import java.io.UnsupportedEncodingException;/*** @Author: 简单模式消息消费者* @Date: 2024/01/29/15:31* @Description: good good study,day day up*/
public class SimpleConsumer {/*** 简单模式消息消费者接受消息* @param args*/public static void main(String[] args) throws Exception{//创建连接工厂对象ConnectionFactory connectionFactory = new ConnectionFactory();//设置RabbitMQ服务主机地址,默认localhostconnectionFactory.setHost("192.168.3.123");//设置RabbitMQ服务端口,默认5672connectionFactory.setPort(5672);//设置虚拟主机名字,默认/connectionFactory.setVirtualHost("/mqname1");//设置用户连接名,默认guestconnectionFactory.setUsername("admin");//设置连接密码,默认guestconnectionFactory.setPassword("admin");//创建连接Connection connection = connectionFactory.newConnection();//创建频道Channel channel = connection.createChannel();//声明队列/*** 1.队列的名字* 2.持久化* 3.是否独占队列* 4.在本次连接释放以后,是否删除队列---临时表* 5.队列的附加属性*/channel.queueDeclare("simple_queue", true, false, false, null);//创建消费者,并设置消息处理:自定义的操作DefaultConsumer defaultConsumer = new DefaultConsumer(channel){/*** 真实自定义处理消息的逻辑* @param consumerTag:消息的标签* @param envelope:消息的属性:消息属于哪个交换机发来的, 消息数据哪个队列=消息routingkey是什么,消息的编号* @param properties* @param body:消息的内容*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException, UnsupportedEncodingException {String s = new String(body, "UTF-8");System.out.println("收到的消息的内容为:" + s);long deliveryTag = envelope.getDeliveryTag();//消息的编号String exchange = envelope.getExchange();//交换机的信息String routingKey = envelope.getRoutingKey();//routingKey的信息System.out.println("收到的消息的编号为:" + deliveryTag);System.out.println("收到的消息的所属的为:" + exchange);System.out.println("收到的消息所属的队列为:" + routingKey);//保存消息到数据库}};//消息监听/*** 1.监听队列的名字* 2.是否自动确认消息*/channel.basicConsume("simple_queue", true, defaultConsumer);//关闭资源(不建议关闭,建议一直监听消息)}
}

在这里插入图片描述

已经消费

在这里插入图片描述

广播模式

package com.mq.pruducer;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** @Author: 广播模式生产者* @Date: 2024/01/29/15:58* @Description: good good study,day day up*/
public class FanoutProducer {/*** 广播模式消息的生产者发送消息* @param args*/public static void main(String[] args) throws Exception{//创建连接工厂对象ConnectionFactory connectionFactory = new ConnectionFactory();//设置RabbitMQ服务主机地址,默认localhostconnectionFactory.setHost("192.168.3.123");//设置RabbitMQ服务端口,默认5672connectionFactory.setPort(5672);//设置虚拟主机名字,默认/connectionFactory.setVirtualHost("/mqname1");//设置用户连接名,默认guestconnectionFactory.setUsername("admin");//设置连接密码,默认guestconnectionFactory.setPassword("admin");//创建连接Connection connection = connectionFactory.newConnection();//创建频道Channel channel = connection.createChannel();//声明队列/*** 1.队列的名字* 2.持久化* 3.是否独占队列* 4.在本次连接释放以后,是否删除队列---临时表* 5.队列的附加属性*/channel.queueDeclare("fanout_queue_1", true, false, false, null);channel.queueDeclare("fanout_queue_2", true, false, false, null);//声明交换机/*** 1.交换机的名字* 2.交换机的类型*/channel.exchangeDeclare("fanout_exchange", BuiltinExchangeType.FANOUT);//绑定/*** 1.队列* 2.交换机* 3.routingkey*/channel.queueBind("fanout_queue_1", "fanout_exchange", "");channel.queueBind("fanout_queue_2", "fanout_exchange", "");for (int i = 0; i < 10; i++) {//创建消息String message = "这是广播模式的第" + i + "条消息!";//消息发送/*** 1.交换机* 2.routingkey是什么:简单模式下和队列的名字保持一致* 3.消息的附加属性是什么* 4.消息的内容是什么*/if(i % 3  == 0){channel.basicPublish("fanout_exchange","", null, message.getBytes());}else{channel.basicPublish("fanout_exchange","", null, message.getBytes());}//关闭资源}channel.close();connection.close();}
}

消费者

package com.mq.consumer;import com.rabbitmq.client.*;import java.io.IOException;/*** @Author: 广播模式消费者1* @Date: 2024/01/29/16:03* @Description: good good study,day day up*/
public class FanoutConsumer1 {/*** 广播模式消息消费者接受消息* @param args*/public static void main(String[] args) throws Exception{//创建连接工厂对象ConnectionFactory connectionFactory = new ConnectionFactory();//设置RabbitMQ服务主机地址,默认localhostconnectionFactory.setHost("192.168.3.123");//设置RabbitMQ服务端口,默认5672connectionFactory.setPort(5672);//设置虚拟主机名字,默认/connectionFactory.setVirtualHost("/mqname1");//设置用户连接名,默认guestconnectionFactory.setUsername("admin");//设置连接密码,默认guestconnectionFactory.setPassword("admin");//创建连接Connection connection = connectionFactory.newConnection();//创建频道Channel channel = connection.createChannel();//声明队列/*** 1.队列的名字* 2.持久化* 3.是否独占队列* 4.在本次连接释放以后,是否删除队列---临时表* 5.队列的附加属性*/channel.queueDeclare("fanout_queue_1", true, false, false, null);//创建消费者,并设置消息处理:自定义的操作DefaultConsumer defaultConsumer = new DefaultConsumer(channel){/*** 真实自定义处理消息的逻辑* @param consumerTag:消息的标签* @param envelope:消息的属性:消息属于哪个交换机发来的, 消息数据哪个队列=消息routingkey是什么,消息的编号* @param properties* @param body:消息的内容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body, "UTF-8");System.out.println("收到的消息的内容为:" + s);long deliveryTag = envelope.getDeliveryTag();//消息的编号String exchange = envelope.getExchange();//交换机的信息String routingKey = envelope.getRoutingKey();//routingKey的信息System.out.println("收到的消息的编号为:" + deliveryTag);System.out.println("收到的消息的所属的为:" + exchange);System.out.println("收到的消息所属的队列为:" + routingKey);//保存消息到数据库}};//消息监听/*** 1.监听队列的名字* 2.是否自动确认消息*/channel.basicConsume("fanout_queue_1", true, defaultConsumer);//关闭资源(不建议关闭,建议一直监听消息)}
}
package com.mq.consumer;import com.rabbitmq.client.*;import java.io.IOException;/*** @Author: 广播模式消费者2* @Date: 2024/01/29/16:03* @Description: good good study,day day up*/
public class FanoutConsumer2 {/*** 广播模式消息消费者接受消息* @param args*/public static void main(String[] args) throws Exception{//创建连接工厂对象ConnectionFactory connectionFactory = new ConnectionFactory();//设置RabbitMQ服务主机地址,默认localhostconnectionFactory.setHost("192.168.3.123");//设置RabbitMQ服务端口,默认5672connectionFactory.setPort(5672);//设置虚拟主机名字,默认/connectionFactory.setVirtualHost("/mqname1");//设置用户连接名,默认guestconnectionFactory.setUsername("admin");//设置连接密码,默认guestconnectionFactory.setPassword("admin");//创建连接Connection connection = connectionFactory.newConnection();//创建频道Channel channel = connection.createChannel();//声明队列/*** 1.队列的名字* 2.持久化* 3.是否独占队列* 4.在本次连接释放以后,是否删除队列---临时表* 5.队列的附加属性*/channel.queueDeclare("fanout_queue_2", true, false, false, null);//创建消费者,并设置消息处理:自定义的操作DefaultConsumer defaultConsumer = new DefaultConsumer(channel){/*** 真实自定义处理消息的逻辑* @param consumerTag:消息的标签* @param envelope:消息的属性:消息属于哪个交换机发来的, 消息数据哪个队列=消息routingkey是什么,消息的编号* @param properties* @param body:消息的内容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body, "UTF-8");System.out.println("收到的消息的内容为:" + s);long deliveryTag = envelope.getDeliveryTag();//消息的编号String exchange = envelope.getExchange();//交换机的信息String routingKey = envelope.getRoutingKey();//routingKey的信息System.out.println("收到的消息的编号为:" + deliveryTag);System.out.println("收到的消息的所属的为:" + exchange);System.out.println("收到的消息所属的队列为:" + routingKey);//保存消息到数据库}};//消息监听/*** 1.监听队列的名字* 2.是否自动确认消息*/channel.basicConsume("fanout_queue_2", true, defaultConsumer);//关闭资源(不建议关闭,建议一直监听消息)}
}

广播模式队列
在这里插入图片描述
广播模式交换机

在这里插入图片描述

7.springboot整合RabbitMQ

       <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.3.4.RELEASE</version></dependency>
server:port: 19012
spring:rabbitmq:host: 192.168.3.123port: 5672virtual-host: /mqname1username: adminpassword: admin

配置类

package com.mq.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @Author: 配置类* @Date: 2024/01/29/16:37* @Description: good good study,day day up*/
@Configuration
public class RabbitMQConfig {//创建队列@Bean("myQueue")public Queue myQueue(){return QueueBuilder.durable("springboot_queue").build();}//创建交换机@Bean("myExchange")public Exchange myExchange(){return ExchangeBuilder.topicExchange("springboot_exchange").build();}//创建绑定@Beanpublic Binding myBinding(@Qualifier("myQueue") Queue myQueue,@Qualifier("myExchange") Exchange myExchange){return BindingBuilder.bind(myQueue).to(myExchange).with("user.#").noargs();}
}

发送消息测试

package com.mq.controller;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @Author: 测试类* @Date: 2024/01/29/13:36* @Description: good good study,day day up*/
@RestController
@RequestMapping("/test")
public class TestController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/one")public String one(){rabbitTemplate.convertAndSend("springboot_exchange","user.insert","1新增类型的消息");rabbitTemplate.convertAndSend("springboot_exchange","user.update","2修改类型的消息");rabbitTemplate.convertAndSend("springboot_exchange","user.delete","3删除类型的消息");return "发送成功";}}

新建一个监听服务

package com.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @Author: 监听类mq* @Date: 2024/01/29/17:19* @Description: good good study,day day up*/
@Component
public class MessageListener {/*** 监听某个队列的消息* @param message 接收到的消息*/@RabbitListener(queues = "springboot_queue")public void myListener1(String message){System.out.println("消费者接收到的消息为:" + message);}
}

在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/439639.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

(九)springboot实战——springboot3下的webflux项目参数验证及其全局参数验证异常处理

前言 在上一节内容中&#xff0c;我们介绍了如何在webflux项目中自定义实现一个全局的异常处理器ErrorWebExceptionHandler&#xff0c;正常情况下其可以处理我们系统的运行时异常&#xff0c;但是无法处理参数验证的异常WebExchangeBindException&#xff0c;所以这里提供另外…

防火墙到防火墙的高可用知识汇总

目录​​​​​​​ 防火墙 防火墙的分类&#xff1a; 防火墙的发展史 传统防火墙&#xff08;包过滤防火墙&#xff09;—— 一个严格的规则表 传统防火墙&#xff08;应用代理防火墙&#xff09;—— 每个应用添加代理 传统防火墙&#xff08;状态检测防火墙&#xff09…

宝塔控制面板配置SSL证书实现网站HTTPS

宝塔安装SSL证书提前申请好SSL证书&#xff0c;如果还没有&#xff0c;先去Gworg里面申请&#xff0c;一般几分钟就可以下来&#xff0c;申请地址&#xff1a;首页-Gworg官方店-淘宝网 一、登录邮箱下载&#xff1a;Gworg证书文件目录 &#xff0c;都会有以下五个文件夹。宝塔…

IBOS代码审计流程-文件上传(超详细)

2.对源代码进行SeayDzend解密 因为在我们打开文件时会有一部分会出现代码是乱码的情况 C:\Users\gaolitao\IBOS\WWW\system\defines 因此我们需要在该源代码的基础上进行解密&#xff0c;这样才能更好的进行代码审计 这里正确填写Zend5.3的加密方式对其进行解密 将解密后的代…

欧拉计划第816题:求大量点的最短距离

本次来解决欧拉计划的第816题: 解: 第一步:最原始的算法 先从简单的情况开始,即原题里的14个点的情况 import mathdef gen_points(n):s = [0] * (2*n)s[0] = 290797for i in range(1, 2*n):s[i] = (s[i - 1] * s[i - 1]) % 50515093p = [(s[2 * i], s[2 * i + 1]) for…

H5 嵌套iframe并设置全屏

H5 嵌套iframe并设置全屏 上图上代码 <template><view class"mp-large-screen-box"><view class"mp-large-screen-count">// 返回按钮<view class"mp-mini-btn-color mp-box-count" hover-class"mp-mini-btn-hover…

PGsql 解析json及json数组

创建测试数据 drop table if exists json_test; create table json_test as select 111 as id, {"nodes":{"1692328028076":{"nodeId":"1692328028076","nodeName":"测试表1","nodeType":"DATACO…

如何在群晖中本地部署WPS Office并实现公网远程访问

文章目录 1. 拉取WPS Office镜像2. 运行WPS Office镜像容器3. 本地访问WPS Office4. 群晖安装Cpolar5. 配置WPS Office远程地址6. 远程访问WPS Office小结 7. 固定公网地址 wps-office是一个在Linux服务器上部署WPS Office的镜像。它基于WPS Office的Linux版本&#xff0c;通过…

ArcGIS Pro 如何计算长度和面积等数据?

要素的几何属性属于比较重要的信息&#xff0c;作为一款专业的GIS软件&#xff0c;ArcGIS Pro自然也是带有计算几何的功能&#xff0c;这里为大家介绍一下计算方法&#xff0c;希望能对你有所帮助。 数据来源 教程所使用的数据是从水经微图中下载的矢量数据&#xff0c;除了矢…

Hana SQL+正则表达式

目录 一、Pre 前言 二、知识点拆解 1&#xff09;case when…then…else 2&#xff09;json_value 函数 拓展资料 3&#xff09;CAST 函数 拓展资料 4) ROUND 函数 5&#xff09;occurences_regexpr 函数 拓展资料 6&#xff09;正则表达式 拓展资料 三、整合分析…

设计模式——2_0 职责链(Chain of Responsibility)

楼下一个男人并得要死&#xff0c;那家隔壁的一家唱着留声机&#xff0c;对面是弄孩子。楼上有两人狂笑&#xff1b;还有打牌声&#xff0c;河中的船上有女人哭她死去的母亲。人类的悲欢并不相通&#xff0c;我只觉得他们吵闹 ——鲁迅 文章目录 定义图纸一个例子&#xff1a;如…

SQL Server ISO镜像文件安装

参考&#xff1a;Sql Server ISO镜像文件安装指南_sqlserveriso文件怎么安装-CSDN博客 参考文件中的步骤基本相同&#xff0c;注意两点 1、尽量安装在D盘&#xff0c;有些组件默认必须安装在C盘&#xff0c;有些会报没有目录的情况 需要在D盘创建目录。 2、我没有windows本地…