微服务——服务异步通讯(MQ高级)

MQ的一些常见问题

消息可靠性

生产者消息确认

返回ack,怎么感觉这么像某个tcp的3次握手。

使用资料提供的案例工程.

在图形化界面创建一个simple.queue的队列,虚拟机要和配置文件里面的一样。

 SpringAMQP实现生产者确认

AMQP里面支持多种生产者确认的类型。

simple是同步等待模式,发了消息之后就一直等待结果,可能会导致代码阻塞。

correlated是异步回调模式,像前段的ajax请求的回调函数。

ApplicationContextAware是bean工厂通知。会在Spring容器创建完后来通知并传一个spring容器到下面的方法。然后从中取到rabbitTemplate的bean并设置ReturnCallback。 

ReturnCallback:消息到了交换机,路由时失败了没有到达消息队列

ConfirmCallback:消息连交换机都没到。

这个不像ReturnCallback只能配置一个,这个可以在每次发消息时设置。

这里在发送消息时多了一个correlationData,这是在配置开关选择的confirm类型为correlated。里面封装了消息的唯一id和callback.

callback里面的result是成功的回调函数,ex是失败的回调函数。这里的失败是指回调都没收到。

实现

先是在生产者的配置文件里要加上前面的配置j

编写returnCallback

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {//获取RabbitTemplate对象RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);//配置ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {//记录日志log.error("消息发送到队列失败,响应码:{},失败原因:{},交换机:{},路由key:{},消息:{}",replyCode,replyText,exchange,routingKey,message.toString());//如果有需要的话,可以重发消息});}
}

编写ConfirmCallback

这里先要在图形界面手动将交换机和消息队列做绑定 

    @Testpublic void testSendMessage2SimpleQueue() throws InterruptedException {//1.准备消息String message = "hello, spring amqp!";//2.准备correlationData//2.1消息IDCorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());//2.2准备ConfirmCallbackcorrelationData.getFuture().addCallback(result -> {//判断结果if(result.isAck()){//ACKlog.debug("消息成功投递到交换机!消息ID:{}",correlationData.getId());}else{//NACKlog.error("消息投递到交换机失败!消息ID:{}",correlationData.getId());}}, ex -> {//记录日志log.error("消息发送失败!",ex);//重发消息});//3.发送消息rabbitTemplate.convertAndSend("camq.topic", "simple.test", message,correlationData);}

测试得到

成功的测试情况

 

失败的测试情况

投递交换机失败,交换机不存在

投递队列失败,队列不存在

 

消息持久化

这里通过重启rabbitmq容器发现消息都不见了可以确认,rabbitmq和redis一样都是内存运行的。

甚至我手动加上的消息队列和绑定关系都不见了。这里消息队列不见是因为前面创建队列时选择的是Transient,不持久化。系统默认的交换机都还在,是因为durable为true,持久化。

创建队列或交换机的时候可以设置Durability为Durable即可持久化。

在消费者代码中进行交换机和队列的创建,然后可以看见如下持久化的交换机和队列.

@Configuration
public class CommonConfig {@Beanpublic DirectExchange simpleExchange(){return new DirectExchange("simple.direct",true,false);}@Beanpublic Queue simpleQueue(){return QueueBuilder.durable("simple.queue").build();}
}

手动发送一条消息进行测试

重启之后消息还是消失了。

要想让消息持久化,需要在发送消息时指定。

@Testpublic void testDurableMessage(){//1.准备消息Message message = MessageBuilder.withBody("hello,pop".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT) //持久的.build();//2.发送消息rabbitTemplate.convertAndSend("simple.queue",message);}

 重启之后消息就持久化了。

通常在springamqp中这些都是持久化的。

消费者消息确认

在none模式下,消费者拿到消息都就报异常了,然后消息也没了。

在auto模式下,消费者拿到消息后给mq报了个unack,然后消息会重新投递,消费者继续拿消息,tmd,死循环了。 但是这里消息就不会消失了。

@RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String msg) {System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");System.out.println(1/0);log.info("消费者处理消息成功!");}

消费失败重试机制

重试次数耗尽之后会将消息丢弃。

消费者失败消息处理策略

 在消费者代码中

@Configuration
public class ErrorMessageConfig {@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue",true);}@Beanpublic Binding errorBinding(){return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");}@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");}
}

 重新发送消息进行测试,可以看见重试次数耗尽之后就送到了死信队列了。

在里面将异常的堆栈信息也包含了. 

 

死信交换机

初识死信交换机

区别在于,上一个是消费者失败之后寻找交换机路由到error队列,这个是退回到队列,再指定交换机,最后路由。

TTL

这个的应用场景比如说订单超时未支付然后自动取消。

实现  

          

准备 代码部分

    @RabbitListener(bindings = @QueueBinding(value=@Queue(name = "dl.queue",durable = "true"),exchange=@Exchange(name="dl.direct"),key = "dl"))public void listenDlQueue(String msg){log.info("接收到 dl.queue的延迟消息:{}",msg);}

@Configuration
public class TTLMessageConfig {@Beanpublic DirectExchange ttlExchange(){return new DirectExchange("ttl.direct");}@Beanpublic Queue ttlQueue(){return QueueBuilder.durable("ttl.queue").ttl(10000).deadLetterExchange("dl.direct").deadLetterRoutingKey("dl").build();}@Beanpublic Binding simpleBinging(){return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");}
}

 测试代码

    @Testpublic void testTTLMessage(){//1.准备消息Message message = MessageBuilder.withBody("hello,ttl".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT) //持久的.build();//2.发送消息rabbitTemplate.convertAndSend("ttl.direct","ttl",message);//3.记录日志log.info("消息成功发送!");}

10s之后在消费者那里就可以看见

 

 然后这里会以短的优先,5s后消费者就可以收到消息。

延迟队列

1.重装rabbitmq容器 

这个插件需要找到mq内部的插件文件夹,所以需要在创建容器的时候进行数据卷挂载。

docker run \-e RABBITMQ_DEFAULT_USER=itcast \-e RABBITMQ_DEFAULT_PASS=123321 \-v mq-plugins:/plugins \--name mq \--hostname mq1 \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3.8-management

 2.安装DelayExchange插件

官方的安装指南地址为:Scheduling Messages with RabbitMQ | RabbitMQ - Blog

上述文档是基于linux原生安装RabbitMQ,然后安装插件。

2.1.下载插件

RabbitMQ有一个官方的插件社区,地址为:Community Plugins — RabbitMQ

大家可以去对应的GitHub页面下载3.8.9版本的插件,地址为Release v3.8.9 · rabbitmq/rabbitmq-delayed-message-exchange · GitHub这个对应RabbitMQ的3.8.5以上版本。 

查看挂载的数据卷.

docker volume inspect mq-plugins

接下来的看着好麻烦,以后看文档吧.

还真的麻烦的一批,真不想再搞这玩意,文件搞来搞去。

不知道为什么,挂载数据卷时一直报错,不能用自己定义的文件夹来挂载。

 

 

在消费者中如下声明

    @RabbitListener(bindings = @QueueBinding(value=@Queue(name = "delay.queue",durable = "true"),exchange=@Exchange(name="delay.direct",delayed = "true"),key = "delay"))public void listenDelayQueue(String msg){log.info("接收到 delay.queue的延迟消息:{}",msg);}

 在生产者中如下定义

    @Testpublic void testSendDelayMessage(){//1.准备消息Message message = MessageBuilder.withBody("hello,ttl".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT) //持久的.setHeader("x-delay",5000).build();//2.准备correlationDataCorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());//3.发送消息rabbitTemplate.convertAndSend("delay.direct", "delay", message,correlationData);log.info("发送消息成功");}

测试结果如下 成功实现延迟5秒。但是会被报错,理论上说交换机应该立即转发,不会延迟,但是这里的延迟交换机可以帮忙保存消息延迟发送,所以这里才会报错,not_router,消息没有到达队列

 为了解决这个报错,需要修改生产者代码

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {//获取RabbitTemplate对象RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);//配置ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {//判断是否是延迟消息if (message.getMessageProperties().getReceivedDelay()>0) {//是一个延迟消息,忽略错误提示return;}//记录日志log.error("消息发送到队列失败,响应码:{},失败原因:{},交换机:{},路由key:{},消息:{}",replyCode,replyText,exchange,routingKey,message.toString());//如果有需要的话,可以重发消息});}
}

惰性队列

消息堆积问题

问题解决

消费者中声明两个队列。 

@Configuration
public class LazyConfig {@Beanpublic Queue lazyQueue(){return QueueBuilder.durable("lazy.queue").lazy().build();}@Beanpublic Queue normalQueue(){return QueueBuilder.durable("normal.queue").build();}
}

 测试,准备两个队列之后分别向两个队列发消息。

    @Testpublic void testLazyMessage(){for(int i=0;i<1000000;i++){//1.准备消息Message message = MessageBuilder.withBody("hello,ttl".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT) //持久的.build();//3.发送消息rabbitTemplate.convertAndSend("lazy.queue", message);}}@Testpublic void testnormalMessage(){for(int i=0;i<1000000;i++){//1.准备消息Message message = MessageBuilder.withBody("hello,ttl".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT) //持久的.build();//3.发送消息rabbitTemplate.convertAndSend("normal.queue", message);}}

可以看见惰性队列的消息全部到paged out 刷出磁盘了?????、,为什么非惰性队列的也是刷出磁盘了。

 

MQ集群

集群个屁,不搞了.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/284821.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Jmeter 性能-负载阶梯场景

1、安装阶梯测试的第三方插件->搜jpgc 选项-JMeter Plugins Manager -搜jpgc 空格&#xff0c;然后安装 2、脚本-线程组选jpgc Stepping Thread Group 最终并发数为100&#xff0c;并发数从0开始&#xff0c;5秒内增加10个并发数&#xff0c;增加10个后持续30s&#xff0c;…

[C++]——STL简介

带你了解c的STL 前言&#xff1a;一、什么是STL?二、STL有什么版本&#xff1f;三、STL的组件有哪些&#xff1f;四、如何学习STL?五、总结 前言&#xff1a; 我写这个博客&#xff0c;是为了在学习过程中能够更加有条理&#xff0c;更加全面&#xff0c;更加清晰的学习STL。…

Pipelined-ADC设计一:序言

现在是2023年12月18日&#xff0c;准备开新帖&#xff0c;设计一个 流水线型 模数转换器&#xff08; Pipelined-ADC &#xff09;。记录帖&#xff0c;后续会放在咸鱼。同步记录&#xff0c;谨防盗用。 初定指标&#xff1a;12位50Mhz&#xff0c;采用2.5bit每级结构&#xff…

教育机构小程序管理系统的全方位优化

随着互联网的快速发展&#xff0c;线上教育也日益受到人们的关注和欢迎。为了满足广大学生和家长的需求&#xff0c;教育机构纷纷开发出自己的小程序管理系统。本文将详细介绍如何使用乔拓云平台&#xff0c;一键开发出自己的教育机构小程序管理系统。 1.进入乔拓云后台 首先&…

C# 使用FluentHttpClient请求WebApi

写在前面 FluentHttpClient 是一个REST API 异步调用 HTTP 客户端&#xff0c;调用过程非常便捷&#xff0c;采用流式编程&#xff0c;可以将所有请求所需的参数一次性发送&#xff0c;并直接获取序列化后的结果。 老规矩从NuGet上安装该类库&#xff1a; 这边一定要认准是 P…

检测车牌的SIFT特征并匹配

# 代码5-14 检测车牌的SIFT特征并匹配 import cv2img1 cv2.imread(../data/plate.jpg) img2 cv2.imread(../data/car.jpg)sift cv2.SIFT_create() # 利用sift.detectAndCompute()函数找到特征点&#xff0c;计算描述符&#xff1b; kp1, des1 sift.detectAndCompute(img1, …

Git提交前的必备神器——自动清除调试语句脚本

说在前面 不知道大家有没有遇到这样一种情况&#xff0c;平时在写代码调试时有时候会使用到debugger&#xff0c;可能大部分时间在提交代码前会记得把debugger先删除&#xff0c;但可能也会存在将debugger提交上去的情况&#xff0c;那我们该怎么防止出现这种情况呢&#xff1f…

C语言—每日选择题—Day53

指针相关博客 打响指针的第一枪&#xff1a;指针家族-CSDN博客 深入理解&#xff1a;指针变量的解引用 与 加法运算-CSDN博客 第一题 1. 有以下程序&#xff0c;输出的结果为&#xff08;&#xff09; #include <stdio.h> int main() {char a H;a (a > A &&…

【03】GeoScene创建海图或者电子航道图数据

1 配置Nautical属性 1.1 管理长名称 长名称&#xff08;LNAM&#xff09;是一个必要的对象标识符&#xff0c;是生产机构&#xff08;AGEN&#xff09;、要素识别号码&#xff08;FIDN&#xff09;和要素识别子项&#xff08;FIDS&#xff09;组件的串联。这三个子组件用于数…

MidJourney笔记(8)-ask和blend命令

经过前面的课程介绍,我相信大家对MidJourney有一定的认识,接下来就给大家介绍一下MidJourney的常用命令。 /ask 获取问题答案。 我一开始以为是随便问题都可以问,最后发现只能回答MidJourney相关的问题。 我们先试试一些日常生活问题: 今天天气如何? 以为它不会识别中文,…

蓝桥杯time模块常用操作

#导入time模块import time #获取时间戳 start_time time.time () print ( "start_time ", start_time) time .sleep ( 3) end_time time.time () print ( "end_time ", end_time)#计算运行时间 print("运行时间 { :.0f } ".format(end_time …

计算机毕业设计 基于SpringBoot的高校毕业与学位资格审核系统的设计与实现 Java实战项目 附源码+文档+视频讲解

博主介绍&#xff1a;✌从事软件开发10年之余&#xff0c;专注于Java技术领域、Python人工智能及数据挖掘、小程序项目开发和Android项目开发等。CSDN、掘金、华为云、InfoQ、阿里云等平台优质作者✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精…