spring-boot对rabbitMQ的操作

一、安装rabbitMQ

  • 1、直接使用docker拉取镜像

    docker pull rabbitmq:3.8
    
  • 2、启动容器

    docker run \-e RABBITMQ_DEFAULT_USER=admin \-e RABBITMQ_DEFAULT_PASS=123456 \-v mq-plugins:/plugins \--name rabbit01 \--hostname rabbit01 --restart=always \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3.8
    
  • 3、关于端口的介绍

    • 15672的给浏览器控制台使用的
    • 5672是给程序调用的
  • 4、进入到rabbit01容器中

    docker exec -it rabbit01 /bin/bash
    
  • 5、开启可视化界面操作

    rabbitmq-plugins enable rabbitmq_management
    
  • 6、客户端直接访问xx:15672

  • 7、或者直接用别人搞好的镜像

    docker run \-e RABBITMQ_DEFAULT_USER=admin \-e RABBITMQ_DEFAULT_PASS=123456 \-v mq-plugins:/plugins \--name rabbit02 \--hostname rabbit02 --restart=always \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3.8-management
    

二、在spring-boot中整合

  • 1、引入依赖包

    <!--   spring boot和junit整合单元测试包     -->
    <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope>
    </dependency>
    <!--   rabbitmq的包引入     -->
    <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
  • 2、在配置文件中引入配置

    server.port=8000
    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=123456
    spring.rabbitmq.virtual-host=/
    

三、简单模式

  • 1、简单模式就是一个生产者一个消费者

  • 2、生产者代码,运行下面的代码,查看可视化界面,并不存在消息,原因是因为需要手动创建simple_queue这个队列

    @SpringBootTest(classes = ProducerApplication.class)
    public class ProducerTest01 {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void test01() {/*** 第一个参数:表示队列的名称* 第二个参数:表示要发送的数据*/rabbitTemplate.convertAndSend("simple_queue", "hello world");}
    }
    
  • 3、运行后查看可视化界面

    在这里插入图片描述

  • 4、定义一个消费者来消费消息

    package com.example.listener01;import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;@Component
    public class ConsumerListener01 {@RabbitListener(queues = "simple_queue")public void listener01(Message message) {String msg = new String(message.getBody());System.out.println("接收到的消息:" + msg);}
    }
    

四、work工作模式

  • 1、简单的来理解,就是在上面简单模式下增加几个消费者,如同搬砖一样的,一个搬运工搬不过来,多叫几个人来干活的性质,避免消息堆积

  • 2、同样的要先手动创建队列,在生产者端循环发送数据

    @Test
    public void test02() {for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend("work_queue", "hello world");}
    }
    
  • 3、定义2个消费者来一起消费消息

    @Component
    public class ConsumerListener01 {@RabbitListener(queues = "work_queue")public void listener01(Message message) {String msg = new String(message.getBody());System.out.println("消费者1接收到的消息:" + msg);}
    }
    
  • 4、先运行消费者,然后运行生产者

    在这里插入图片描述

五、发布模式

  • 1、发布模式是指发送一个消息,希望在几个消费者那边都能接收到,上面的工作模式,一条消息被一个消费者消费了,另外一个消费者就接收不到消息,在一些场景需要给每个消费者就要用发布者模式
  • 2、根据交换机的模式可以分为以下几种
    • Fanout,广播模式,将消息全部交给所有与之绑定的队列,这里的router key为空字符串
    • Direct,将消息指定到对应的routing key
    • Topic,通配符模式,这里的routing key根据规则匹配
      • *表示一个单词
      • #表示多个单词
一、Fanout模式
  • 1、使用配置文件的方式创建交换机和队列,并且将他们绑定在一起

    package com.example.config;import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;@Configuration
    public class RabbitFanoutExchangeConfiguration {// 交换机@Beanpublic Exchange fanoutExchange() {return ExchangeBuilder.fanoutExchange("fanout_exchange").durable(true).build();}// 创建一个队列@Beanpublic Queue fanoutQueue1() {return QueueBuilder.durable("fanout_queue1").build();}// 创建一个队列@Beanpublic Queue fanoutQueue2() {return QueueBuilder.durable("fanout_queue2").build();}// 队列和交换机绑定@Beanpublic Binding fanoutExchangeQueue01() {// with表示路由keyreturn BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange()).with("").noargs();}@Beanpublic Binding fanoutExchangeQueue02() {// with表示路由keyreturn BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange()).with("").noargs();}
    }
    
  • 2、生产者发送消息

    @Test
    public void test03() {rabbitTemplate.convertAndSend("fanout_exchange", "", "hello world");
    }
    
  • 3、查看可视化界面,会自动创建一个交换机和两个路由key

    在这里插入图片描述

  • 4、定义消费者

    @Component
    public class ConsumerListener01 {@RabbitListener(queues = "fanout_queue1")public void listener01(Message message) {String msg = new String(message.getBody());System.out.println("消费者1接收到的消息:" + msg);}
    }
    
  • 5、这时候会发现,几个消费者都会同时输出

二、Direct模式
  • 1、其实就是在上面的模式上修改创建交换机的类型,及指定类型,别的都保持不变
三、Topic模式
  • 1、在定义路由key的时候使用*或者#来表示

六、直接在监听上使用注解的方式来创建交换机等

  • 1、正常创建交换机和队列的方式有三种方式

    • 直接在可视化界面手动创建
    • 使用java api方式一个一个创建需要先创建交换机、队列,并且让队列和交换机绑定在一起
    • 直接使用注解的方式来实现
  • 2、使用注解的方式直接来创建交换机和队列

    package com.example.listener04;import org.springframework.amqp.core.ExchangeTypes;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.Exchange;
    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.QueueBinding;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;@Component
    public class ConsumerListener01 {@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "queue_01", durable = "true"),exchange = @Exchange(value = "direct_rabbit_exchange", type = ExchangeTypes.DIRECT),key = {"info", "error"}))public void listener01(Message message) {String msg = new String(message.getBody());System.out.println("消费者1接收到的消息:" + msg);}
    }
    
  • 3、运行后查看rabbitmq可视化界面

  • 4、定义发送消息的方法

    public void test04() {rabbitTemplate.convertAndSend("direct_rabbit_exchange","error","hello world");
    }
    

七、消息丢失

  • 1、消息丢失主要存在的场景
    • 生产者投递消息的时候就丢失,比如写错了交换机的名字
    • 交换机到队列丢失,比如写错了队列名称
    • 队列到接收者数据丢失
一、开启生产者确认机制
  • 1、开启生产者确认机制

    server.port=9000
    spring.rabbitmq.host=123.56.103.229
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=123456
    spring.rabbitmq.virtual-host=/
    # 开启生产者确认机制
    spring.rabbitmq.publisher-confirm-type=correlated 
    
  • 2、重写RabbitTemplate,只要我们在容器中有一个RabbitTemplate,那么spring boot就不会用对RabbitTemplate自动化配置

    package com.example.config;import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;@Configuration
    public class RabbitmqConfiguration {/*** ConnectionFactory 由于Spring Boot根据连接的配置信息实现自动化配置,在spring容器中是直接存在ConnectionFactory对象*/@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String s) {if (ack) {System.out.println("消息正常投递到交换机中");} else {System.out.println("消息投递到交换机失败:"+s);}}});return rabbitTemplate;}}
    
  • 3、发送消息的时候故意写错交换机的名字

    @Test
    public void test04() throws InterruptedException {rabbitTemplate.convertAndSend("direct_rabbit_exchange_xx","error","hello world");Thread.sleep(2000);
    }
    
  • 4、处理消息投送失败的方式

    • 使用数据库表来保存发送失败的消息,主要字段有:消息唯一id、消息内容、重试次数、当前消息发送状态
    • 在消息投送失败的时候重试几次
    • 定时任务将失败的批量再次发送
  • 5、在发送消息的时候传递当前唯一的识别id,这里使用uuid

    @Test
    public void test04() throws InterruptedException {String msgUuid = UUID.randomUUID().toString().replace("-", "");CorrelationData correlationData = new CorrelationData(msgUuid);rabbitTemplate.convertAndSend("direct_rabbit_exchange", "error", "hello world", correlationData);Thread.sleep(2000);
    }
    
  • 6、获取当前消息的唯一识别

    @Configuration
    public class RabbitmqConfiguration {/*** ConnectionFactory 由于Spring Boot根据连接的配置信息实现自动化配置,在spring容器中是直接存在ConnectionFactory对象*/@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String s) {if (ack) {System.out.println("消息正常投递到交换机中");} else {String mesId = correlationData.getId();System.out.println(mesId);System.out.println("消息投递到交换机失败:"+s);}}});return rabbitTemplate;}
    }
    
二、交换机到队列的时候出现问题
  • 1、当队列名称写错了,或者不存在的时候会出现这个情况

  • 2、开启生产者回调机制

    server.port=9000
    spring.rabbitmq.host=123.56.103.229
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=123456
    spring.rabbitmq.virtual-host=/
    # 开启生产者确认机制
    spring.rabbitmq.publisher-confirm-type=correlated
    # 开启生产者回调机制
    spring.rabbitmq.publisher-returns=true
    
  • 3、绑定回退函数

    package com.example.config;import org.springframework.amqp.core.ReturnedMessage;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;@Configuration
    public class RabbitmqConfiguration {/*** ConnectionFactory 由于Spring Boot根据连接的配置信息实现自动化配置,在spring容器中是直接存在ConnectionFactory对象*/@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);...// 绑定回退机制的回调函数rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {System.out.println(returnedMessage.getMessage());System.out.println(returnedMessage.getReplyCode());System.out.println(returnedMessage.getReplyText());System.out.println(returnedMessage.getExchange());System.out.println(returnedMessage.getRoutingKey());}});return rabbitTemplate;}
    }
三、消息持久化
  • 1、rabbitmq默认是在内存中存储,当服务宕机后数据直接会丢失,消息在spring boot中持久化是因为框架帮你处理了,修改消息是否持久化可以参考下面

    @Test
    public void test04() throws InterruptedException {String msgUuid = UUID.randomUUID().toString().replace("-", "");CorrelationData correlationData = new CorrelationData(msgUuid);MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {MessageProperties messageProperties = message.getMessageProperties(); // 获取到消息属性对象messageProperties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); // 设置消息不缓存return message;}};rabbitTemplate.convertAndSend("direct_rabbit_exchange", "error", "hello world",messagePostProcessor, correlationData);Thread.sleep(2000);
    }
    
四、消费者消费消息不丢失
  • 1、在spring boot中消费者应答模式主要有以下几种

    • none自动应答,消费者获取到消息以后直接给rabbitmq返回ack
    • auto(默认模式),由spring boot框架根据业务执行特点决定给rabbitmqack还是uack,业务执行正常完成后返回ack,业务执行中出现异常的时候返回uack
    • manual手动应答模式,由程序员自己根据业务执行特点给rabbitmq返回对应的ack还是uack
  • 2、配置应答模式

    server.port=8000
    spring.rabbitmq.host=123.56.103.229
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=123456
    spring.rabbitmq.virtual-host=/
    # 配置应答模式
    spring.rabbitmq.listener.simple.acknowledge-mode=auto
    
八、消费限流
  • 1、在消费者端添加配置

    server.port=8000
    spring.rabbitmq.host=123.56.103.229
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=123456
    spring.rabbitmq.virtual-host=/
    spring.rabbitmq.listener.simple.acknowledge-mode=auto
    # 每次处理10个
    spring.rabbitmq.listener.simple.prefetch=10
    

九、死信队列

  • 1、在下面几种情况下会产生死信队列

    • 消息的存活时间到了
    • 队列满了,比如队列只能放10个消息,这时候发送11个消息过来,就有一个消息为死信,在队列中时间最久的那个将为成为死信队列
    • 消费被拒绝了,或者rabbitmq返回uack
  • 2、死信队列的架构图

    在这里插入图片描述

  • 3、创建死信队列

    package com.example.config;import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;@Configuration
    public class RabbitDlxExchangeConfiguration {// 创建一个死信交换机@Beanpublic Exchange dlxExchange() {return ExchangeBuilder.fanoutExchange("dlx_exchange").durable(true).build();}// 创建一个死信队列@Beanpublic Queue dlxQueue() {return QueueBuilder.durable("dlx_queue").maxLength(10).build();}// 死信交换机和死信队列绑定@Beanpublic Binding dlxQueueBinding() {return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dead").noargs();}// 创建一个正常的交换机@Beanpublic Exchange orderExchange() {return ExchangeBuilder.directExchange("order_exchange").durable(true).build();}// 创建一个正常队列@Beanpublic Queue orderQueue() {return QueueBuilder.durable("order_queue").maxLength(10).deadLetterExchange("dlx_exchange"). // 死信队列的交换机deadLetterRoutingKey("dead"). // 死信队列的routingKeybuild();}// 正常交换机和正常队列绑定@Beanpublic Binding orderQueueBinding() {return BindingBuilder.bind(orderQueue()).to(orderExchange()).with("info").noargs();}
    }
    
  • 4、发送消息

    @Test
    public void test05() throws InterruptedException {for (int i = 0; i < 15; i++) {rabbitTemplate.convertAndSend("order_exchange", "info", "hello world" + i);}Thread.sleep(2000);
    }
    
  • 5、查看可视化界面,进入死信队列的是时间最早的(也就是最先发送的)

    在这里插入图片描述

  • 6、定义消费者

    package com.example.listener05;import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;@Component
    public class ConsumerListener01 {@RabbitListener(queues = "dlx_queue")public void listener01(Message message) {String msg = new String(message.getBody());System.out.println("接收到的死信队列消息:" + msg);}@RabbitListener(queues = "order_queue")public void listener02(Message message) {String msg = new String(message.getBody());System.out.println("接收到的订单队列消息:" + msg);}
    }
    

十、延迟任务

  • 1、在rabbitmq中没有真正意义上的延迟队列任务,只是采用ttl+死信队列来完成的

  • 2、延迟任务主要用于场景

    • 文章定时发布
    • 订单多少分钟后取消支付
  • 3、延迟任务的结构图

    在这里插入图片描述

  • 4、创建一个延迟任务的队列

    @Configuration
    public class RabbitDlxExchangeConfiguration {...@Beanpublic Queue orderQueue() {return QueueBuilder.durable("order_queue").// maxLength(10).ttl(2000). // 过期时间deadLetterExchange("dlx_exchange"). // 死信队列的交换机deadLetterRoutingKey("dead"). // 死信队列的routingKeybuild();}
    }
    
  • 5、发送消息,观察可视化界面,时间到了就会进入到死信队列中

    @Test
    public void test06() throws InterruptedException {rabbitTemplate.convertAndSend("order_exchange", "info", "hello world");Thread.sleep(2000);
    }
    
  • 6、在死信队列中监听数据来改变数据库状态

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/226592.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

量子力学:科技前沿的探索与挑战

量子力学:科技前沿的探索与挑战 一、量子力学的魅力与挑战 量子力学是研究微观粒子如电子、光子等行为的物理学分支。与经典力学不同,量子力学描述了一个充满不确定性和概率性的世界。在这个世界里,粒子可以同时处于多个状态,只有当我们对其进行测量时,它才会“选择”一个…

[含泪解决]OSError: [Errno 99] Cannot assign requested address__踩坑记录——app.py绑定IP失败

踩坑记录下。 是这个样子的&#xff0c;前几天帮别人部署Python的Flask项目到云服务器上&#xff0c;然后在 app.run(host"xxx.xxx.xxx.xxx",port8080) 这行代码中&#xff0c;xxx.xxx.xxx.xxx代表我的IP地址&#xff0c;port代表我的端口号。 然后不是要部署到服…

想成为网络安全工程师该如何学习?

一、网络安全应该怎么学&#xff1f; 1.计算机基础需要过关 这一步跟网安关系暂时不大&#xff0c;是进入it行业每个人都必须掌握的基础能力。 计算机网络计算机操作系统算法与数据架构数据库 Tips:不用非要钻研至非常精通&#xff0c;可以与学习其他课程同步进行。 2.渗透技…

数据库中生成列的对比

简介 生成列&#xff08;虚拟列&#xff09;&#xff1a;在实际开发中&#xff0c;相对一个历史数据的表增加一个字段&#xff0c;增加下游报表&#xff0c;数据分析的可用性。常见的方法就是删表重建&#xff0c;或者使用ADD column 语法。如果是一个历史表&#xff0c;删…

电子学会C/C++编程等级考试2022年06月(三级)真题解析

C/C++等级考试(1~8级)全部真题・点这里 第1题:制作蛋糕 小A擅长制作香蕉蛋糕和巧克力蛋糕。制作一个香蕉蛋糕需要2个单位的香蕉,250个单位的面粉,75个单位的糖,100个单位的黄油。制作一个巧克力蛋糕需要75个单位的可可粉,200个单位的面粉,150个单位的糖,150个单位的黄…

交叉编译

1. 交叉开发 交叉编译&#xff1a; 在电脑把程序编写 编译 调试好 再下载到嵌入式产品中运行 编译&#xff1a; gcc 之前编译环境和运行环境是一样的 交叉编译&#xff1a; 编译 把编译代码和运行分开 编译代码在虚拟机中 运行…

vue3(二)-基础入门之列表循环、数组变动检测、filter模糊查询、事件修饰符

一、列表循环 of 和 in 都是一样的效果 html代码&#xff1a; <div id"app"><ul><li v-for"item of datalist">{{ item }}</li></ul><ul><li v-for"item in dataobj">{{ item }}</li></u…

福州大学《嵌入式系统综合设计》 实验八:FFMPEG视频编码

一、实验目的 掌握使用算能平台进行视频编码的流程&#xff0c;包括开发主机环境与云平台的配置&#xff0c;视频编码程序的编写与理解&#xff0c;代码的编译、运行以及学习使用码流分析工具分析视频压缩码流等。 二、实验内容 搭建实验开发环境&#xff0c;编译并运行编码…

unity3d地图、地面跟着NPC跑

清除烘焙后&#xff0c;再 将地图、地面的设置为非静态。只设置NPC的寻路路面为静态&#xff0c;再烘焙

全面介绍SSO(单点登录)

全面介绍SSO&#xff08;单点登录&#xff09; SSO英文全称Single SignOn&#xff0c;单点登录。SSO是在多个应用系统中&#xff0c;用户只需要登录一次就可以访问所有相互信任的应用系统。它包括可以将这次主要的登录映射到其他应用中用于同一个用户的登录的机制。它是目前比…

学习知识回顾随笔(远程连接MySQL|远程访问Django|HTTP协议|Web框架)

文章目录 如何远程连接MySQL数据库1.创建用户来运行&#xff0c;此用户从任何主机连接到mysql数据库2.使用IP地址来访问MySQL数据库 如何远程访问Django项目Web应用什么是Web应用应用程序的两种模式Web应用程序的优缺点 HTTP协议&#xff08;超文本传输协议&#xff09;简介HTT…

monorepo多项目管理主流实现方式:1.learn + yarn/npm workspace 2.pnpm

npm域级包 随着npm包越来越多&#xff0c;而且包名也只能是唯一的&#xff0c;如果一个名字被别人占了&#xff0c;那你就不能再使用这个名字&#xff1b;假设我想要开发一个utils包&#xff0c;但是张三已经发布了一个utils包&#xff0c;那我的包名就不能叫utils了&#xff…