RabbitMQ不公平分发与预取值

1.分发简介

RabbitMQ不设置的话默认采用轮询方式分发消息,你一个我一个(公平);但实际生活中,由于处理速度不同,若还采用轮询方式分发会导致处理速度快的空等待,因此我们采用不公平分发

2.不公平分发

消费者这侧设置即可,以之前的Worker3和Worker4为例

2.1.Worker3

package com.hong.rabbitmq3;import com.hong.utils.RabbitMQUtil;
import com.hong.utils.SleepUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;/*** @Description: 消息手动应答时不丢失,放回队列重新消费* @Author: hong* @Date: 2023-12-16 23:05* @Version: 1.0**/
public class Worker3 {private static final String TASK_QUEUE_NAME = "ack_queue";public static void main(String[] args) throws Exception{Channel channel = RabbitMQUtil.getChannel();System.out.println("worker3等待接收消息,处理速度快");DeliverCallback deliverCallback = (comsumerTag, message) -> {SleepUtil.sleep(1);System.out.println("接收到的消息:"+  new String(message.getBody(),"UTF-8"));//手动应答/*** 第一个参数:消息标识* 第二个参数是否批量:true批量*/channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback = var -> System.out.println(var + "消息消费被中断!");/** 不公平分发* 不设置或设置0 公平分发(轮询分发,RabbitMQ默认消息分发方式)* 1  不公平分发*/channel.basicQos(1);//手动应答falsechannel.basicConsume(TASK_QUEUE_NAME,false,deliverCallback,cancelCallback);}
}

2.2.Worker4

package com.hong.rabbitmq3;import com.hong.utils.RabbitMQUtil;
import com.hong.utils.SleepUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;/*** @Description: 消息手动应答时不丢失, 放回队列重新消费* @Author: hong* @Date: 2023-12-16 23:05* @Version: 1.0**/
public class Worker4 {private static final String TASK_QUEUE_NAME = "ack_queue";public static void main(String[] args) throws Exception{Channel channel = RabbitMQUtil.getChannel();System.out.println("worker4等待接收消息,处理速度慢");DeliverCallback deliverCallback = (comsumerTag, message) -> {SleepUtil.sleep(20);System.out.println("接收到的消息:"+  new String(message.getBody(),"UTF-8"));//手动应答/*** 第一个参数:消息标识* 第二个参数是否批量:true批量*/channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback = var -> System.out.println(var + "消息消费被中断!");/** 不公平分发* 不设置或设置0 公平分发(轮询分发,RabbitMQ默认消息分发方式)* 1  不公平分发*/channel.basicQos(1);channel.basicConsume(TASK_QUEUE_NAME,false,deliverCallback,cancelCallback);}
}

在这里插入图片描述

3.结果

启动Task3,Worker3,Worker4发现处理速度快的Worker3在Worker4还没处理完第一条消息时已处理了多条消息(能者多劳/强者多劳)
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

4.预取值

不公平分发不管处理速度如何都是将消息分发给相对空闲的消费者,而预取值可以认为是未确认的消息缓冲区,该值时通道上允许未确认消息的最大值。一旦达到此值RabbitMQ在该通道上传递消息,除非至少有一个未应答的消息被ack.
还是只在消费者这侧修改,以之前的Worker3和Worker4为例

4.1.Worker3

Worker3处理速度快,设置预取值为5

package com.hong.rabbitmq4;import com.hong.utils.RabbitMQUtil;
import com.hong.utils.SleepUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;/*** @Description: 预取值* @Author: hong* @Date: 2023-12-18 23:05* @Version: 1.0**/
public class Worker3 {private static final String TASK_QUEUE_NAME = "prefetch_queue";public static void main(String[] args) throws Exception{Channel channel = RabbitMQUtil.getChannel();System.out.println("worker3等待接收消息,处理速度快");DeliverCallback deliverCallback = (comsumerTag, message) -> {SleepUtil.sleep(1);System.out.println("接收到的消息:"+  new String(message.getBody(),"UTF-8"));//手动应答/*** 第一个参数:消息标识* 第二个参数是否批量:true批量*/channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback = var -> System.out.println(var + "消息消费被中断!");/** 不公平分发* 不设置或设置0 公平分发(轮询分发,RabbitMQ默认消息分发方式)* 1  不公平分发* 5*/channel.basicQos(5);//手动应答falsechannel.basicConsume(TASK_QUEUE_NAME,false,deliverCallback,cancelCallback);}
}

4.2.Worker4

Worker4处理速度慢,设置预取值为2

package com.hong.rabbitmq4;import com.hong.utils.RabbitMQUtil;
import com.hong.utils.SleepUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;/*** @Description: 预取值* @Author: hong* @Date: 2023-12-18 23:05* @Version: 1.0**/
public class Worker4 {private static final String TASK_QUEUE_NAME = "prefetch_queue";public static void main(String[] args) throws Exception{Channel channel = RabbitMQUtil.getChannel();System.out.println("worker4等待接收消息,处理速度慢");DeliverCallback deliverCallback = (comsumerTag, message) -> {SleepUtil.sleep(20);System.out.println("接收到的消息:"+  new String(message.getBody(),"UTF-8"));//手动应答/*** 第一个参数:消息标识* 第二个参数是否批量:true批量*/channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback = var -> System.out.println(var + "消息消费被中断!");/** 不公平分发* 不设置或设置0 公平分发(轮询分发,RabbitMQ默认消息分发方式)* 1  不公平分发*/channel.basicQos(2);channel.basicConsume(TASK_QUEUE_NAME,false,deliverCallback,cancelCallback);}
}

5.预取值结果

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
预取值也是一种不公平分发,不公平总是将消息转给相对空闲的消费者,预取值是提前设置好的每个消费者处理的数量,有点类似权重。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/295873.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

通过windows cng api 实现rsa非对称加密

参考: 1,使用 CNG 加密数据 - Win32 apps | Microsoft Learn 2,不记得了 (下文通过cng api演示rsa加密,不做原理性介绍) 相对于aes等对称加密算法,rsa加密算法不可逆性更强。非对称加密在通常情况下,使…

异步消息原理

作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO 联系qq:184480602,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬 在日常开发中&#xff…

短链接技术解析:链接的简化之道

文章目录 前言起源实现原理常见短链接生成算法哈希算法自增计数随机生成基于关键字的生成 短链接的作用字符空间节省美化和简化个性化定制 实现一个简单的短链接服务个人简介 前言 大家在短信中是不是经常看到下面的短连接,简短易记: 看到这个时你是不是…

Python零基础教程5.0——无限画图下装逼

正方形的脸让我迷糊 引言开整完整代码1效果1完整代码2效果2完整代码3效果3 结尾 引言 哈哈,真巧 今天周末 有趣的人已经开始HAPPY 我只能码代码,写教程 不过,锻炼使我快乐! 少年的苦,中年的甘,老年的甜 …

Mybatis-TypeHandler类型转换器

文章目录 TypeHandler 接口TypeHandler 注册TypeHandler 查询别名管理总结 TypeHandler 接口 TypeHandler 这个接口 就是Mybatis的类型转换器 /*** author Clinton Begin*/ public interface TypeHandler<T> {// 在通过PreparedStatement为SQL语句绑定参数时&#xff0…

智能优化算法应用:基于鹈鹕算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于鹈鹕算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于鹈鹕算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.鹈鹕算法4.实验参数设定5.算法结果6.参考文献7.MA…

80x86汇编—汇编程序基本框架

文章目录 First Program指令系统伪指令数值表达式 程序框架解释int 21 中断 通过一个基本框架解释各个指令和用处&#xff0c;方便复习。所以我认为最好的学习顺序就是先看一段完整的汇编代码程序&#xff0c;然后给你逐个逐个的解释每一个代码是干嘛用的。然后剩下的还有很多指…

LTE之接口协议

一、接口协议栈 接口是指不同网元之间的信息交互方式。既然是信息交互&#xff0c;就应该使用彼此都能看懂的语言&#xff0c;这就是接口协议。接口协议的架构称为协议栈。根据接口所处位置分为空中接口和地面接口&#xff0c;响应的协议也分为空中接口协议和地面接口协议。空…

反序列化漏洞原理、成因、危害、攻击、防护、修复方法

反序列化漏洞是一种安全漏洞&#xff0c;它允许攻击者将恶意代码注入到应用程序中。这种漏洞通常发生在应用程序从不安全的来源反序列化数据时。当应用程序反序列化数据时&#xff0c;它将数据从一种格式&#xff08;例如JSON或XML&#xff09;转换为另一种格式&#xff08;例如…

信号与线性系统翻转课堂笔记7——信号正交与傅里叶级数

信号与线性系统翻转课堂笔记7——信号正交与傅里叶级数 The Flipped Classroom7 of Signals and Linear Systems 对应教材&#xff1a;《信号与线性系统分析&#xff08;第五版&#xff09;》高等教育出版社&#xff0c;吴大正著 一、要点 &#xff08;1&#xff0c;重点&a…

Java经典面试题——手写快速排序和归并排序

题目链接&#xff1a;https://www.luogu.com.cn/problem/P1177 输入模板&#xff1a; 5 4 2 4 5 1快速排序 技巧&#xff1a;交换数组中的两个位置 a[l] a[l] a[r] - (a[r] a[l]); 稳定不稳定&#xff1f;:不稳定 注意找哨兵那里内循环的等于号不能漏&#xff0c;不然…

Linux 基础指令三

一、cat命令 默认是顺序查看&#xff0c;可同时查看多个文件&#xff0c;只能看普通文件&#xff0c;不能看文件以外 使用格式: cat [选项] 文件名 常用选项 -n显示行号-b跳过空白行编号-s将所有的连续的多个空行替换为一个空行&#xff08;压缩成一个空行&#xff0…