RabbitMQ: return机制

1. Return机制

Confirm只能保证消息到达exchange,无法保证消息可以被exchange分发到指定queue。

而且exchange是不能持久化消息的,queue是可以持久化消息。

采用Return机制来监听消息是否从exchange送到了指定的queue中

 2.Java的实现方式

1.导入依赖

        <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.6.0</version></dependency>

2.生产者的实现方式

 采用Return机制来监听消息是否从exchange送到了指定的queue中

package com.qf.mq2302.hello;import com.qf.mq2302.utils.MQUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ReturnListener;import java.io.IOException;public class SendRetrun {public static final String QUEUE_NAME="hello-queue";public static void main(String[] args) throws Exception {//1.获取连接对象Connection conn = MQUtils.getConnection();//2. 创建一个channel对象,对于MQ的大部分操作,都定义在了channel对象上Channel channel = conn.createChannel();//3.声明了一个队列/*** queue – the name of the queue* durable – true代表创建的队列是持久化的(当mq重启后,该队列依然存在)* exclusive – 该队列是不是排他的 (该对立是否只能由当前创建该队列的连接使用)* autoDelete – 该队列是否可以被mq服务器自动删除* arguments – 队列的其他参数,可以为null*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);//开启 return 机制//编写回调方法channel.addReturnListener(new ReturnListener() {//如果消息没有成功发送到队列,这个方法会被调用@Overridepublic void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("====================ReturnListener==================");System.out.println("replyCode:"+replyCode);System.out.println("replyText:"+replyText);System.out.println("exchange:"+exchange);System.out.println("routingKey:"+routingKey);System.out.println("properties:"+properties);System.out.println("body:"+new String(body,"utf-8"));System.out.println("====================ReturnListener==================");}});String message = "Hello doubleasdasda!";//生产者如何发送消息,使用下面的方法即可/*** exchange – 交换机的名字 ,如果是空串,说明是把消息发给了默认交换机* routingKey – 路由的key,当发送消息给默认交换机时,routingkey代表队列的名字* other properties - 消息的其他属性,可以为null* body – 消息的内容,注意,要是有 字节数组*///注意:如果要使用生产者的return机制,需要在发送消息时,指定mandatory(强制性)为truechannel.basicPublish("", "sadnaas", true,null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");Thread.sleep(1000);//   关闭资源channel.close();conn.close();}
}

这个必须要加上才能让rutern返回机制生效 

 

 3.消费者的实现方式

package com.qf.mq2302.hello;import com.qf.mq2302.utils.MQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;import java.io.IOException;public class Recv {private  final  static  String QUEUE_NAME="hello-queue";public static void main(String[] args) throws Exception {//1.获取连接对象Connection conn = MQUtils.getConnection();//2. 创建一个channel对象,对于MQ的大部分操作,都定义在了channel对象上Channel channel = conn.createChannel();/*** 第一个参数队列名称* 第二个参数,耐用性* 第三个参数排外性* 第四个参数是否自动删除* 第五个参数,可以定义什么类型的队列*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);//3.该消费者收到消息之后的处理逻辑,写在DeliverCallback对象中DeliverCallback deliverCallback =new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {System.out.println(consumerTag);//从Delivery对象中可以获取到生产者,发送的消息的字节数组byte[] body = message.getBody();String msg = new String(body, "utf-8");//在这里写消费者的业务逻辑,例如,发送邮件System.out.println(msg);}};//4.让当前消费者开始消费(QUEUE_NAME)队列中的消息/*** queue – the name of the queue* autoAck – true 代表当前消费者是不是自动确认模式。true代表自动确认。* deliverCallback – 当有消息发送给该消费者时,消费者如何处理消息的逻辑* cancelCallback – 当消费者被取消掉时,如果要执行代码,写到这里*/channel.basicConsume(QUEUE_NAME,true,deliverCallback,consumerTag -> {});}}

3.整合springboot实现

1.导入依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

2.yml配置文件

spring:rabbitmq:host: 8.140.244.227port: 6786username: testpassword: testvirtual-host: /testpublisher-returns: true #开启return机制

3.RabbitMQ配置文件

package com.qf.bootmq2302.config;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {@Beanpublic RabbitTemplate rabbitTemplate(CachingConnectionFactory cachingConnectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate();//设置连接工厂对象rabbitTemplate.setConnectionFactory(cachingConnectionFactory);// 开启return机制rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("message:"+new String(message.getBody()));System.out.println("replyCode:"+replyCode);System.out.println("replyText:"+replyText);System.out.println("exchange:"+exchange);System.out.println("routingKey:"+routingKey);}});return rabbitTemplate;}}

4.生产者的controller

    @AutowiredRabbitTemplate rabbitTemplate;@GetMapping("/test1")public String test1(String msg,String routkey){System.out.println(msg);String exchangeName = "";//默认交换机String routingkey = routkey;//队列名字//生产者发送消息rabbitTemplate.convertAndSend(exchangeName,routingkey,msg);return "ok";}

5.消费者写一个队列

   @RabbitListener(queues = "queueA")public void getMsg1(Map<String,Object> data, Channel channel,Message message) throws IOException {System.out.println(data);//手动ack//若开启手动ack,不给手动ack,就按照 prefetch: 1 #等价于basicQos(1)的量,就这么多,不会多给你了,因为你没有确认。确认一条,就给你一条channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}

6.消费者的配置文件

spring:rabbitmq:host: 8.140.244.227port: 6786username: testpassword: testvirtual-host: /test#手动ACKlistener:simple:acknowledge-mode: manual  # 手动ackprefetch: 1 #等价于basicQos(1)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/103116.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

suning苏宁API接入说明(苏宁商品详情+关键词搜索商品列表)

API地址:https://o0b.cn/anzexi 调用示例&#xff1a;https://api-gw.onebound.cn/suning/item_get/?keytest_api_key& &num_iid0070134261/703410301&&langzh-CN&secret 参数说明 通用参数说明 version:API版本key:调用key,测试key:test_api_keyapi_na…

jmeter 线程组

在jmeter中&#xff0c;通过指定并发数量、启动延迟时间和持续时间&#xff0c;并组织示例&#xff08;Samplers&#xff09;在多个线程之间的执行方式&#xff0c;实现模拟并发用户的行为。 添加线程组&#xff1a; 在测试计划中&#xff0c;右键点击“添加” -> “Thread…

css flex:1;详解,配合demo效果解答

前言 给设置了display&#xff1a;flex的子组件设置了flex&#xff1a;1&#xff1b;就能让他填满整个容器&#xff0c;如果有多个就平均 flex&#xff1a;1&#xff1b;是另外三个样式属性的简写&#xff0c;等同 flex-grow: 0; flex-shrink: 1; flex-basis: auto;我们就针…

谈一谈冷门的C语言爬虫

目录 C语言写爬虫是可行的 C语言爬虫不受待见 C语言爬虫有哪些可用的库和工具 C语言爬虫示例 总结 在当今的编程世界中&#xff0c;C语言相比于一些主流编程语言如Python、JavaScript等&#xff0c;使用范围相对较窄。然而&#xff0c;尽管C语言在爬虫领域的应用并不常见&…

sentinel熔断报java.lang.reflect.UndeclaredThrowableException

背景&#xff1a;内部要进行应用jdk&springboot升级&#xff0c;因此也需要将Spring Cloud Hystrix 替换成alibaba sentinel。 依赖 <dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-sentinel</a…

芒果app逆向分析 (二)

接着上文,我们发现请求后的结果是加密的状态,我们需要解密成明文看数据。 前面提到无法使用frida,直接上xposed. 直接就hook出来了?? key = "xkSHHy5DQzYwbZS32zJBDyrHCHWMDGDk" iv = "4yXhd2Ta4m6dif54"堆栈记录下,方便后续使用: 调用堆栈:at ja…

Mysql--事务

事务 开始之前&#xff0c;让我们先想一个场景&#xff0c;有的时候&#xff0c;为了完成某个工作&#xff0c;需要完成多种sql操作 比如转账 再比如下单 第一步 我的账户余额减少 第二步 商品的库存要减少 第三步 订单表中要新增一项 事务的本质&#xff0c;就是为了把多个操…

Unity UGUI(二)核心组件

Unity Canvas相关知识学习 文章目录 Unity Canvas相关知识学习1. Canvas&#xff1a;1.1 Render Mode1.2 多个Canvas的显示顺序 2.Canvas Scaler&#xff1a;屏幕分辨率自适应2.1 UI Scale Mode 3. EventSystem4. Standalone Input Module5. Graphic Raycaster&#xff1a;图形…

dantax参数调优

dantax参数调优 1.speed调优 可能会导致数据倾斜 处理的速度不同&#xff0c;可能会导致job非常慢 举例子&#xff0c;比如总限速是每秒100条record&#xff0c;其中第一个channel速度是每秒99条record&#xff0c;第二个channel是每秒1条record&#xff0c;加起来是每条100条…

初学Python记

Python这个编程语言的大名当然听说过了呀&#xff0c;这几年特别火&#xff0c;火的一塌涂地。大家可以回忆一下&#xff1a;朋友圈推荐的广告里经常可以看见python的网课广告。 本学期&#xff0c;学校开设了python课程&#xff0c;这几天学习了一下入了一下门&#xff0c;感…

将PyCharm中的终端运行前面的PS修改成当前环境

最近使用Pycharm中的Terminal来pip安装一些pakage&#xff0c;发现Terminal运行前面的显示的是PS&#xff0c;然后输入安装指令报错。“python无法将“pip”项识别为 cmdlet、函数、脚本文件或可运行程序的名称。” 解决方法&#xff1a; 只需要在pycharm的设置中修改一些termi…

CSS 滚动驱动动画 scroll()

CSS 滚动驱动动画 scroll() animation-timeline 通过 scroll() 指定可滚动元素与滚动轴来为容器动画提供一个匿名的 scroll progress timeline. 通过元素在顶部和底部(或左边和右边)的滚动推进 scroll progress timeline. 并且元素滚动的位置会被转换为百分比, 滚动开始被转化为…