1.分发简介
RabbitMQ不设置的话默认采用轮询方式分发消息,你一个我一个(公平);但实际生活中,由于处理速度不同,若还采用轮询方式分发会导致处理速度快的空等待,因此我们采用不公平分发
2.不公平分发
在消费者这侧设置即可,以之前的Worker3和Worker4为例
2.1.Worker3
package com.hong.rabbitmq3;import com.hong.utils.RabbitMQUtil;
import com.hong.utils.SleepUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;/*** @Description: 消息手动应答时不丢失,放回队列重新消费* @Author: hong* @Date: 2023-12-16 23:05* @Version: 1.0**/
public class Worker3 {private static final String TASK_QUEUE_NAME = "ack_queue";public static void main(String[] args) throws Exception{Channel channel = RabbitMQUtil.getChannel();System.out.println("worker3等待接收消息,处理速度快");DeliverCallback deliverCallback = (comsumerTag, message) -> {SleepUtil.sleep(1);System.out.println("接收到的消息:"+ new String(message.getBody(),"UTF-8"));//手动应答/*** 第一个参数:消息标识* 第二个参数是否批量:true批量*/channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback = var -> System.out.println(var + "消息消费被中断!");/** 不公平分发* 不设置或设置0 公平分发(轮询分发,RabbitMQ默认消息分发方式)* 1 不公平分发*/channel.basicQos(1);//手动应答falsechannel.basicConsume(TASK_QUEUE_NAME,false,deliverCallback,cancelCallback);}
}
2.2.Worker4
package com.hong.rabbitmq3;import com.hong.utils.RabbitMQUtil;
import com.hong.utils.SleepUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;/*** @Description: 消息手动应答时不丢失, 放回队列重新消费* @Author: hong* @Date: 2023-12-16 23:05* @Version: 1.0**/
public class Worker4 {private static final String TASK_QUEUE_NAME = "ack_queue";public static void main(String[] args) throws Exception{Channel channel = RabbitMQUtil.getChannel();System.out.println("worker4等待接收消息,处理速度慢");DeliverCallback deliverCallback = (comsumerTag, message) -> {SleepUtil.sleep(20);System.out.println("接收到的消息:"+ new String(message.getBody(),"UTF-8"));//手动应答/*** 第一个参数:消息标识* 第二个参数是否批量:true批量*/channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback = var -> System.out.println(var + "消息消费被中断!");/** 不公平分发* 不设置或设置0 公平分发(轮询分发,RabbitMQ默认消息分发方式)* 1 不公平分发*/channel.basicQos(1);channel.basicConsume(TASK_QUEUE_NAME,false,deliverCallback,cancelCallback);}
}
3.结果
启动Task3,Worker3,Worker4发现处理速度快的Worker3在Worker4还没处理完第一条消息时已处理了多条消息(能者多劳/强者多劳)
4.预取值
不公平分发不管处理速度如何都是将消息分发给相对空闲的消费者,而预取值可以认为是未确认的消息缓冲区,该值时通道上允许未确认消息的最大值。一旦达到此值RabbitMQ在该通道上传递消息,除非至少有一个未应答的消息被ack.
还是只在消费者这侧修改,以之前的Worker3和Worker4为例
4.1.Worker3
Worker3处理速度快,设置预取值为5
package com.hong.rabbitmq4;import com.hong.utils.RabbitMQUtil;
import com.hong.utils.SleepUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;/*** @Description: 预取值* @Author: hong* @Date: 2023-12-18 23:05* @Version: 1.0**/
public class Worker3 {private static final String TASK_QUEUE_NAME = "prefetch_queue";public static void main(String[] args) throws Exception{Channel channel = RabbitMQUtil.getChannel();System.out.println("worker3等待接收消息,处理速度快");DeliverCallback deliverCallback = (comsumerTag, message) -> {SleepUtil.sleep(1);System.out.println("接收到的消息:"+ new String(message.getBody(),"UTF-8"));//手动应答/*** 第一个参数:消息标识* 第二个参数是否批量:true批量*/channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback = var -> System.out.println(var + "消息消费被中断!");/** 不公平分发* 不设置或设置0 公平分发(轮询分发,RabbitMQ默认消息分发方式)* 1 不公平分发* 5*/channel.basicQos(5);//手动应答falsechannel.basicConsume(TASK_QUEUE_NAME,false,deliverCallback,cancelCallback);}
}
4.2.Worker4
Worker4处理速度慢,设置预取值为2
package com.hong.rabbitmq4;import com.hong.utils.RabbitMQUtil;
import com.hong.utils.SleepUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;/*** @Description: 预取值* @Author: hong* @Date: 2023-12-18 23:05* @Version: 1.0**/
public class Worker4 {private static final String TASK_QUEUE_NAME = "prefetch_queue";public static void main(String[] args) throws Exception{Channel channel = RabbitMQUtil.getChannel();System.out.println("worker4等待接收消息,处理速度慢");DeliverCallback deliverCallback = (comsumerTag, message) -> {SleepUtil.sleep(20);System.out.println("接收到的消息:"+ new String(message.getBody(),"UTF-8"));//手动应答/*** 第一个参数:消息标识* 第二个参数是否批量:true批量*/channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback = var -> System.out.println(var + "消息消费被中断!");/** 不公平分发* 不设置或设置0 公平分发(轮询分发,RabbitMQ默认消息分发方式)* 1 不公平分发*/channel.basicQos(2);channel.basicConsume(TASK_QUEUE_NAME,false,deliverCallback,cancelCallback);}
}
5.预取值结果
预取值也是一种不公平分发,不公平总是将消息转给相对空闲的消费者,预取值是提前设置好的每个消费者处理的数量,有点类似权重。