推或拉? rabbitMQ 消费模式该如何选择

系列文章目录

消息队列选型——为什么选择RabbitMQ
RabbitMQ 五种消息模型
RabbitMQ 能保证消息可靠性吗


文章目录

  • 系列文章目录
  • 前言
  • 一、推拉两种模式的概念
  • 二、推模式的使用及优势
    • 1. 使用
    • 2. 优劣
  • 三、拉模式的使用及优势
    • 1. 使用
    • 2. 优劣
  • 四、消费端Ack模式与Qos
    • 1. Ack模式
    • 2. Qos 服务质量
  • 五、总结


前言

在前面的选型对比中,我们提到了rabbitMQ同时支持推和拉的消息投递方式,那么什么是消息的推和拉?我们又该如何选择呢?今天我们就一起来看下吧


一、推拉两种模式的概念

MQ 是一个非常重要的消息传递架构,它可以实现解耦并且提高系统的可靠性和吞吐量。 在很多MQ组件中,消息可以通过推和拉模式进行传递。

  • 推模式
    在推模式下,当一个生产者发布消息到队列时,队列会立即将这条消息发送给所有订阅了该队列的消费者
  • 拉模式
    在拉模式下,当生产者发布消息到队列时,队列不会立即发送消息给消费者,而是等待消费者请求消息后才发送

二、推模式的使用及优势

1. 使用

代码如下(示例):

package com.zhanfu.springboot.demo.mq;
import com.rabbitmq.client.*;
import java.io.IOException;public class PushConsumer {private final static String QUEUE_NAME = "myqueue";public static void main(String[] args) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");// 创建连接Connection connection = factory.newConnection();// 创建信道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 定义消费者Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("Received '" + message + "'");}};// 开始消费消息,第二个参数为是否自动ackchannel.basicConsume(QUEUE_NAME, true, consumer);}
}

我们定义了一个consumer 消费者,然后把该消费者使用basicConsume方法来订阅某个队列的消息。当有消息到达队列时,consumer 里的handleDelivery方法就会被调用

2. 优劣

  1. 优势:这种方式可以实现实时通信
  2. 劣势:如果消费者的处理能力跟不上生产者的速度,就会在消费者处造成消息堆积

三、拉模式的使用及优势

1. 使用

代码如下(示例):

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.*;public class RabbitMQPullConsumer {private static final String QUEUE_NAME = "queue_name";private static final String HOST_NAME = "host_name";private static final String USERNAME = "username";private static final String PASSWORD = "password";public static void main(String[] args) throws IOException, TimeoutException {// 创建连接和通道ConnectionFactory factory = new ConnectionFactory();factory.setHost(HOST_NAME);factory.setUsername(USERNAME);factory.setPassword(PASSWORD);Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.basicQos(1);// 消费消息while (true) {// 手动从队列中获取一条消息,第二个参数为是否自动ackGetResponse response = channel.basicGet(QUEUE_NAME, false);if (response == null) {// 没有消息,则等待一段时间再继续检查try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}} else {String message = new String(response.getBody(), "UTF-8");System.out.println("Received message: " + message);channel.basicAck(response.getEnvelope().getDeliveryTag(), false); // 手动Ack}}}
}

如果获取的GetResponse对象为null,则表示队列中没有消息。我们将等待一段时间(这里是1秒),然后再继续检查队列中是否有新消息。

如果获取的GetResponse对象不为null,则表示有新消息。我们将从GetResponse对象中提取消息内容,然后输出它。如果我们设置basicGet()方法的第二个参数为true,则表示自动ack,即在获取消息后直接将消息从队列中删除。

这是一个基本的拉模式的RabbitMQ消费者实现。当然,生产上如果采用拉模式,我们更推荐使用多个线程异步的方式处理,一个线程负责循环拉取消息后,存入一个长度有限的阻塞队列,另一个/一些线程从阻塞队列中取出消息,处理完一条则手动Ack一条,这样更有效率。

2. 优劣

  1. 优势:消费端可以按照自己的处理速度来消费,避免产生在消费端产生消息堆积。
  2. 劣势:消息传递方面可能会有一些延迟,当处理速度长时间小于消息发布速度时,容易造成大量消息堆积在rabbitMQ服务器,一些时间敏感的队列,可能会使内部的消息失效

四、消费端Ack模式与Qos

1. Ack模式

我们在上面的代码中,不难发现,不管是推模式还是拉模式,方法的入参中都有一个布尔变量

// 推模式,直接订阅
channel.basicConsume(QUEUE_NAME, true, consumer);
// 拉模式,一次拉取一条
channel.basicGet(QUEUE_NAME, true);

这个布尔变量其实就是是否自动Ack,其实我们在《RabbitMQ 能保证消息可靠性吗》一文中就提到过,这是一种消息确认机制:

  • 自动ACK
    即意味着,我们收到消息时就会通知RabbitMQ服务器,服务器就会将该消息已经被消费,进而删除。
  • 手动ACK
    -即意味着在某个我觉得可以通知RabbitMQ服务器时,我才会通知。

那么两种模式我们该怎么取舍?

从实际生产的角度,我强烈建议将Ack模式设置为手动,这样可以保证消费者处理消息失败时,消息不会立即从队列中删除,而是需要重新分配给其他消费者,增强了系统的容错性和可靠性。同时,建议在消费者处理消息时,对消息进行异常处理,确保消息能够正确地被处理

2. Qos 服务质量

在rabbitMQ的API中,我们不难发现有一项叫做 Qos(quality of service—— 服务质量) 的参数设置
在这里插入图片描述

These settings impose limits on the amount of data the server will deliver to consumers before requiring acknowledgements.Thus they provide a means of consumer-initiated flow control.
这些设置对服务器在接收到Ack之前将传递给消费者的数据量进行了限制。因此,它们提供了一种由消费者发起的流量控制方式。

通俗的讲,这个Qos参数是配合Ack 用来控制消费端的流量的(即限流:自动ACK不需要设置Qos;手动ACK模式时,则可以设置Qo,控制消费者处理消息的速度,它拥有三个参数

  1. prefetchSize:服务器将传递的最大内容量(以八位字节为单位),如果不受限制,则为0
  2. prefetchCount:服务器将传递的最大消息条数,如果不受限制,则为0
  3. global:是否将该设置应用到整个信道,还是仅此一个消费者

在上面拉模式的代码中,我们就使用了一个 channel.basicQos(1); ,也是最常用的限制方法,即以消息条数为单位限制,其实设置的就是最大消息条数

    /*** Request a specific prefetchCount "quality of service" settings* for this channel.* <p>* Note the prefetch count must be between 0 and 65535 (unsigned short in AMQP 0-9-1).** @param prefetchCount maximum number of messages that the server*                      will deliver, 0 if unlimited* @throws java.io.IOException if an error is encountered* @see #basicQos(int, int, boolean)*/void basicQos(int prefetchCount) throws IOException;

我们可以设置QoS,控制消费者处理消息的速度。通过合理设置预取计数或预取字节数,可以确保消费者处理消息的速度与RabbitMQ服务器发送消息的速度相匹配,避免队列中积压过多的未确认消息需要注意的是,这个prefetchCount的取值是一个经验值,太大容易导致消费端内存消耗过高,太小则效率低下,一般建议在100以下


五、总结

综合来看,推模式适合实时通信且生产者和消费者的速度相当的场景,而且对于利于压榨出消费者端的处理潜力;拉模式适合大量数据的场景,并且可以更好地控制消息的消费进度。但是,实际应用中更多地是将两种模式结合使用,以达到更好的效果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/1700.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

RobotFramework +appium实现Android自动化

环境准备 1、已安装python37版本&#xff08;SDK、JDK均已安装完成&#xff0c;且环境变量都配置好了&#xff09;。 2、已安装robotframework。 3、已安装安卓模拟器&#xff08;本文使用夜神模拟器&#xff09;。 4、安装appium&#xff08;下载地址&#xff1a;http://6…

HOT19-螺旋矩阵

leetcode原题链接&#xff1a;螺旋矩阵 题目描述 给你一个 m 行 n 列的矩阵 matrix &#xff0c;请按照 顺时针螺旋顺序 &#xff0c;返回矩阵中的所有元素。 示例 1&#xff1a; 输入&#xff1a;matrix [[1,2,3],[4,5,6],[7,8,9]] 输出&#xff1a;[1,2,3,6,9,8,7,4,5]示例…

NAPT之NAT地址池、NAPT之easy-ip、NAT-Server

NAPT之NAT地址池 拓扑 需求 实现企业内网主机&#xff08;PC1-PC4&#xff09;访问公网网站服务器&#xff08;Server1&#xff09; 配置步骤 第一步&#xff1a;给PC1-PC4/Server1配置接口IP地址&#xff0c;掩码&#xff0c;网关 第二步&#xff1a;R1配置默认路由 -边界…

【强化学习】常用算法之一 “Q-learning”

作者主页&#xff1a;爱笑的男孩。的博客_CSDN博客-深度学习,活动,python领域博主爱笑的男孩。擅长深度学习,活动,python,等方面的知识,爱笑的男孩。关注算法,python,计算机视觉,图像处理,深度学习,pytorch,神经网络,opencv领域.https://blog.csdn.net/Code_and516?typeblog个…

【期末不挂科 学习数据结构】

期末不挂科 学习数据结构 第一章绪论1.1数据结构的基本概念1.1.1基本概念和术语1.数据2.数据元素3.数据对象4.数据类型5.数据结构 1.1.2数据结构三要素1.数据的逻辑结构2.数据的存储结构3.数据的运算 第一章绪论 1.1数据结构的基本概念 1.1.1基本概念和术语 1.数据 数据是信…

cmd中输入npm install,回车——安装node modules依赖,出现报错的【解决方法】

目录 1.正常情况是&#xff1a; 2.当前问题&#xff1a; 3.解决方法&#xff1a; 当拿到一个前端项目的代码文件夹的时候,想要启动项目。 如果项目的代码文件夹里面没有node modules文件夹&#xff1a; 需要打开cmd&#xff0c;然后在里面输入 npm install &#xff08;可…

Pytorch安装

一、查看 CUDA 版本 使用pip install torch (2.0.0版本)&#xff0c;这样安装的torch是直接运行在CPU上的&#xff0c;想要使用GPU版本需要使用对应的cuda版本。 在安装pytorch时我们需要选择对应CUDA版本的pytorch&#xff0c;那如何查看CUDA版本呢&#xff1f; 1.NVIDIA方式…

Java 被挤出前三。。

TIOBE 2023 年 06 月份的编程语言排行榜已经公布&#xff0c;官方的标题是&#xff1a;Python 还会保持第一吗&#xff1f;&#xff08;Will Python remain number 1?&#xff09; 在过去的 5 年里&#xff0c;Python 已经 3 次获得 TIOBE 指数年度大奖&#xff0c;这得益于…

AD从原理图到PCB超详细教程

AD超详细教程 前言一、建立一个工程模板二、原理图1.设计原理图。2.使用AD自带库和网上开源原理图库3.画原理图库4.编译原理图 三、PCB1.确定元器件尺寸大小2.绘制PCB Library①使用元器件向导绘制元件库②原理图与PCB的映射 3.绘制PCB①更新PCB②调整元件位置③布线④漏线检查…

多线程7——线程池各参数的意义+四种拒绝策略+代码模拟实现

文章目录 前言一、线程池是什么&#xff1f;二、线程池的使用1.代码使用线程池2.剖析线程池3.线程池的拒绝策略 三、代码模拟实现线程池总结 前言 本人是一个刚刚上路的IT新兵,菜鸟!分享一点自己的见解,如果有错误的地方欢迎各位大佬莅临指导,如果这篇文章可以帮助到你,劳请大…

Python 基本数据类型(五)

文章目录 每日一句正能量List&#xff08;列表&#xff09;结语 每日一句正能量 营造良好的工作和学习氛围&#xff0c;时刻牢记宗旨&#xff0c;坚定信念&#xff0c;胸怀全局&#xff0c;埋头苦干&#xff0c;对同事尊重信任谅解&#xff0c;发扬团体协作精神&#xff0c;积极…

使用Xshell服务器跑程序,用pycharm连接服务器远程开发

目标&#xff1a; 1.使用Xshell在服务器上创建自己项目需要的虚拟环境 2.用pycharm实现远程服务器的连接&#xff08;这样就可以在本地debug或者写代码&#xff0c;然后再用xshell在服务器上跑&#xff09; 一、使用Xshell在服务器上创建自己项目需要的虚拟环境 1.打开Xshe…