Pulsar客户端消费模式揭秘:Go 语言实现 ZeroQueueConsumer

news/2024/11/14 5:42:52/文章来源:https://www.cnblogs.com/crossoverJie/p/18329716

前段时间在 pulsar-client-go 社区里看到这么一个 issue:

import "github.com/apache/pulsar-client-go/pulsar"client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650",
})
if err != nil {log.Fatal(err)
}
consumer, err := client.Subscribe(pulsar.ConsumerOptions{Topic:             "persistent://public/default/mq-topic-1",SubscriptionName:  "sub-1",Type:              pulsar.Shared,ReceiverQueueSize: 0,
})
if err != nil {log.Fatal(err)
}// 小于等于 0 时会设置为 1000
const (  defaultReceiverQueueSize = 1000  
)
if options.ReceiverQueueSize <= 0 {  options.ReceiverQueueSize = defaultReceiverQueueSize  
}

他发现手动将 pulsar-client-go 客户端的 ReceiverQueueSize 设置为 0 的时候,客户端在初始化时会再将其调整为 1000.

if options.ReceiverQueueSize < 0 {  options.ReceiverQueueSize = defaultReceiverQueueSize  
}

而如果手动将源码修改为可以设置为 0 时,却不能正常消费,消费者会一直处于 waiting 状态,获取不到任何数据。

经过我的排查发现是 Pulsar 的 Go 客户端缺少了一个 ZeroQueueConsumerImpl的实现类,这个类主要用于可以精细控制消费逻辑。

If you'd like to have tight control over message dispatching across consumers, set the consumers' receiver queue size very low (potentially even to 0 if necessary). Each consumer has a receiver queue that determines how many messages the consumer attempts to fetch at a time. For example, a receiver queue of 1000 (the default) means that the consumer attempts to process 1000 messages from the topic's backlog upon connection. Setting the receiver queue to 0 essentially means ensuring that each consumer is only doing one thing at a time.

https://pulsar.apache.org/docs/next/cookbooks-message-queue/#client-configuration-changes

正如官方文档里提到的那样,可以将 ReceiverQueueSize 设置为 0;这样消费者就可以一条条的消费数据,而不会将消息堆积在客户端队列里。

客户端消费逻辑

借此机会需要再回顾下 pulsar 客户端的消费逻辑,这样才能理解 ReceiverQueueSize 的作用以及如何在 pulsar-client-go 如何实现这个 ZeroQueueConsumerImpl

Pulsar 客户端的消费模式是基于推拉结合的:


如这张图所描述的流程,消费者在启动的时候会主动向服务端发送一个 Flow 的命令,告诉服务端需要下发多少条消息给客户端。

同时会使用刚才的那个 ReceiverQueueSize参数作为内部队列的大小,将客户端下发的消息存储在内部队列里。

然后在调用 receive 函数的时候会直接从这个队列里获取数据。


每次消费成功后都会将内部的一个 AvailablePermit+1,直到大于 MaxReceiveQueueSize / 2 就会再次向 broker 发送 flow 命令,告诉 broker 再次下发消息。

所以这里有一个很关键的事件:就是向 broker 发送 flow 命令,这样才会有新的消息下发给客户端。

之前经常都会有研发同学让我排查无法消费的问题,最终定位到的原因几乎都是消费缓慢,导致这里的 AvailablePermit 没有增长,从而也就不会触发 broker 给客户端推送新的消息。

看到的现象就是消费非常缓慢。

ZeroQueueConsumerImpl 原理

下面来看看 ZeroQueueConsumerImpl 是如何实现队列大小为 0 依然是可以消费的。


在构建 consumer 的时候,就会根据队列大小从而来创建普通消费者还是 ZeroQueueConsumerImpl 消费者。

@Override  
protected CompletableFuture<Message<T>> internalReceiveAsync() {  CompletableFuture<Message<T>> future = super.internalReceiveAsync();  if (!future.isDone()) {  // We expect the message to be not in the queue yet  increaseAvailablePermits(cnx());  }  return future;  
}

这是 ZeroQueueConsumerImpl 重写的一个消费函数,其中关键的就是 increaseAvailablePermits(cnx());.

    void increaseAvailablePermits(ClientCnx currentCnx) {increaseAvailablePermits(currentCnx, 1);}protected void increaseAvailablePermits(ClientCnx currentCnx, int delta) {int available = AVAILABLE_PERMITS_UPDATER.addAndGet(this, delta);while (available >= getCurrentReceiverQueueSize() / 2 && !paused) {if (AVAILABLE_PERMITS_UPDATER.compareAndSet(this, available, 0)) {sendFlowPermitsToBroker(currentCnx, available);break;} else {available = AVAILABLE_PERMITS_UPDATER.get(this);}}}

从源码里可以得知这里的逻辑就是将 AvailablePermit 自增,达到阈值后请求 broker 下发消息。

因为在 ZeroQueueConsumerImpl 中队列大小为 0,所以 available >= getCurrentReceiverQueueSize() / 2永远都会为 true。

也就是说每消费一条消息都会请求 broker 让它再下发一条消息,这样就达到了每一条消息都精确控制的效果。

pulsar-client-go 中的实现

为了在 pulsar-client-go 实现这个需求,我提交了一个 PR 来解决这个问题。

其实从上面的分析已经得知为啥手动将 ReceiverQueueSize 设置为 0 无法消费消息了。

根本原因还是在初始化的时候优于队列为 0,导致不会给 broker 发送 flow 命令,这样就不会有消息推送到客户端,也就无法消费到数据了。

所以我们依然得参考 Java 的 ZeroQueueConsumerImpl 在每次消费的时候都手动增加 availablePermits

为此我也新增了一个消费者 zeroQueueConsumer

// EnableZeroQueueConsumer, if enabled, the ReceiverQueueSize will be 0.  
// Notice: only non-partitioned topic is supported.  
// Default is false.  
EnableZeroQueueConsumer boolconsumer, err := client.Subscribe(ConsumerOptions{  Topic:                   topicName,  SubscriptionName:        "sub-1",  Type:                    Shared,  NackRedeliveryDelay:     1 * time.Second,  EnableZeroQueueConsumer: true,  
})if options.EnableZeroQueueConsumer {  options.ReceiverQueueSize = 0  
}

在创建消费者的时候需要指定是否开启 ZeroQueueConsumer,当开启后会手动将 ReceiverQueueSize 设置为 0.

// 可以设置默认值。
private int receiverQueueSize = 1000;

在 Go 中无法像 Java 那样在结构体初始化化的时候就指定默认值,再加上 Go 的 int 类型具备零值(也就是0),所以无法区分出 ReceiverQueueSize=0 是用户主动设置的,还是没有传入这个参数使用的零值。

所以才需要新增一个参数来手动区分是否使用 ZeroQueueConsumer


之后在创建 consumer 的时候进行判断,只有使用的是单分区的 topic 并且开启了 EnableZeroQueueConsumer 才能创建 zeroQueueConsumer


使用 PARTITIONED_METADATA 命令可以让 broker 返回分区数量。


func (z *zeroQueueConsumer) Receive(ctx context.Context) (Message, error) {if state := z.pc.getConsumerState(); state == consumerClosed || state == consumerClosing {z.log.WithField("state", state).Error("Failed to ack by closing or closed consumer")return nil, errors.New("consumer state is closed")}z.Lock()defer z.Unlock()z.pc.availablePermits.inc()for {select {case <-z.closeCh:return nil, newError(ConsumerClosed, "consumer closed")case cm, ok := <-z.messageCh:if !ok {return nil, newError(ConsumerClosed, "consumer closed")}return cm.Message, nilcase <-ctx.Done():return nil, ctx.Err()}}}

其中的关键代码:z.pc.availablePermits.inc()

消费时的逻辑其实和 Java 的 ZeroQueueConsumerImpl 逻辑保持了一致,也是每消费一条数据之前就增加一次 availablePermits

pulsar-client-go 的运行原理与 Java 客户端的类似,也是将消息存放在了一个内部队列里,所以每次消费消息只需要从这个队列 messageCh 里获取即可。

值得注意的是, pulsar-client-go 版本的 zeroQueueConsumer 就不支持直接读取内部的队列了。

func (z *zeroQueueConsumer) Chan() <-chan ConsumerMessage {  panic("zeroQueueConsumer cannot support Chan method")  
}

会直接 panic,因为直接消费 channel 在客户端层面就没法帮用户主动发送 flow 命令了,所以这个功能就只能屏蔽掉了,只可以主动的 receive 消息。

许久之前我也画过一个关于 pulsar client 的消费流程图,后续考虑会再写一篇关于 pulsar client 的原理分析文章。

参考链接:

  • https://github.com/apache/pulsar-client-go/issues/1223
  • https://cloud.tencent.com/developer/article/2307608
  • https://pulsar.apache.org/docs/next/cookbooks-message-queue/#client-configuration-changes
  • https://github.com/apache/pulsar-client-go/pull/1225

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/773531.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

配置 setting.py

配置 setting.py Django项目的配置主要通过settings.py文件完成。这个文件位于项目的根目录下(与manage.py同级)。为了让你的应用更加本地化,你可能需要修改一些设置,比如语言、时区等。打开settings.py文件,并找到以下设置进行修改: # setting.py# 语言设置(en-us:英文…

关于GPIO输出模式下3种频率的解释

配置GPIO输出模式时,有3种频率可以选择,2MHz、10MHz和50MHz,如下图:这三个频率说的是I/O口驱动电路的响应频率而不是输出信号的频率。芯片的内部做了多个响应频率不同的输出驱动电路,用户可以根据自己的需要选择合适的驱动电路。通过选取不同频率的输出驱动电路达到最佳的…

二手车交易预测模型笔记

一、梯度下降法 梯度下降法就是一种通过求目标函数的导数来寻找目标函数最小化的方法。梯度下降目的是找到目标函数最小化时的取值所对应的自变量的值,目的是为了找自变量X。 梯度:是一个矢量,其方向上的方向导数最大(意味着在这个方向上,函数的值增加最快。从图形上看,就…

Qt+OpenCascade开发笔记(二):windows开发环境搭建(二):Qt引入occ库,搭建基础工程模板Demo和发布Demo

前言Open CASCADE是由Open Cascade SAS公司开发和支持的开源软件开发平台,旨在为特定领域快速开发程序而设计。它是一个面向对象的C++类库,提供了丰富的几何造型、数据交换和可视化等功能,成为许多CAD软件的核心组件。  本篇描述搭建Qt开发occ环境过程。 Demo注意:用的是…

7/26admin投放端代码理解

xorm是一个数据库映射框架

chsap连接Mysql

前置操作 打开MySql服务 添加引用MySql.Data.dll 数据库查询数据 读取一条数据,直接运行就能看到打印台输出。1 using Mysql.Data.MyAqlClient;2 static void Main(string[] args)3 {4 string connStr="Database=test007;Data Source=127.0.0.1;port=3306;User Id=ro…

论文阅读:BERT-Based Chinese Relation Extraction for Public Security

模型框架 包含一个BERT模型层(嵌入+编码+池化->得到句子的特征向量)、一个Dropout层(防止过拟合)。基于BERT的预训练模型 BERT模型是通过注意力机制对训练集进行处理。然后,通过Embedding层和Encoder层加载预训练的词向量。 最后,Pooling 层使用 BERT 模型来训练两个句…

现在有什么赛道可以干到退休?

一个小小评论区惊现阿里和腾讯的两位大佬!他们干到退休应该是没什么问题,那你们呢?文中还有粉丝投稿的一次完整面试的面经,速来围观。最近,一则“90后无论男女都得65岁以后退休”的消息在多个网络平台流传,也不知道是真是假,好巧不巧今天刷热点的时候又看到一条这样的热…

vue.config.js的作用(修改webpack相关配置,读取入口文件,模版文件等)

在vue项目下有名为vue.config.js的文件,该文件可以配置webpack读取入口,模版文件的配置 在左测的配置都是可以修改的 上述在pages的属性对象中对entry入口进行修改了,将main.js修改为了lgx.js后续要更改webpack的配置可以直接在vue2配置参考上进行查看

【FMC155】基于VITA57.1标准的2路500MSPS/1GSPS/1.25GSPS 14位直流耦合AD采集FMC子卡模块

板卡概述FMC155是一款基于VITA57.1标准的,实现2路14-bit、500MSPS/1GSPS/1.25GSPS 直流耦合ADC同步采集FMC子卡模块。该模块遵循VITA57.1规范,可直接与FPGA载卡配合使用,板卡ADC器件采用ADI的AD9680芯片,该芯片具有两个模拟输入通道和两个JESD204B输出数据通道对,可用于高…