Pulsar服务端处理消费者请求以及源码解析

引言

处理读写是Pulsar服务端最基本也是最重要的逻辑,今天就重点看看服务端是如何处理的读请求也就是消费者请求

正文

Pulsar服务端处理消费者请求的流程大致如下图所示
在这里插入图片描述

  1. 消费者通过TCP向服务端发起消息拉取请求
  2. Broker会根据请求中携带的ID来获取在服务端对应的Consumer对象,每个Consumer对象都有一个对应的游标对象,这个游标对象会调用Dispatcher来做数据查询的操作
  3. Dispatcher会先尝试读取缓存,这个缓存是个跳表结构并且节点数据是存在堆外内存中的,如果命中则直接返回
  4. 未命中缓存的话会通过Bookkeeper客户端去读取Bookkeeper中的数据,读取到后会通过跟客户端所建立的TCP连接将查到的数据发送过去

整体流程就是这四步,接下来就让咱们看看Pulsar的代码实现吧

处理消费请求

Broker处理的请求基本都是从ServerCnx这里开始的,因为它实现了Netty的ChannelInboundHandlerAdapter类,因此所有TCP的数据写进来时最终都是ServerCnx进行处理的,处理消费的请求时从handleFlow方法开始,因此从这里进行跟踪

    protected void handleFlow(CommandFlow flow) {....//从当前Broker维护的Consumer列表中获取客户端对应服务端的Consumer对象CompletableFuture<Consumer> consumerFuture = consumers.get(flow.getConsumerId());if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {Consumer consumer = consumerFuture.getNow(null);if (consumer != null) {//传入客户端配置的拉取条数,最大默认不会超过1000consumer.flowPermits(flow.getMessagePermits());} else {log.info("[{}] Couldn't find consumer {}", remoteAddress, flow.getConsumerId());}}}public void flowPermits(int additionalNumberOfMessages) {....// 处理消息拉取请求,继续跟进去看看subscription.consumerFlow(this, additionalNumberOfMessages);}public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {this.lastConsumedFlowTimestamp = System.currentTimeMillis();//最终调用者是dispatcherdispatcher.consumerFlow(consumer, additionalNumberOfMessages);}

Dispatcher是个接口,在这里选择PersistentDispatcherSingleActiveConsumer的实现进行跟踪

    public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {//作为一个任务交给线程池处理executor.execute(() -> internalConsumerFlow(consumer));}private synchronized void internalConsumerFlow(Consumer consumer) {//进行消息的读取readMoreEntries(consumer);}private void readMoreEntries(Consumer consumer) {....//通过游标进行数据的读取cursor.asyncReadEntriesOrWait(messagesToRead,bytesToRead, this, readEntriesCtx, topic.getMaxReadPosition());}

PersistentDispatcherSingleActiveConsumer最终会调用ManagedCursorImpl进行数据的读取,这里要注意PersistentDispatcherSingleActiveConsumer实现了回调接口,也就是它自身实现了数据读取成功的处理逻辑。这里它将自己作为参数传给下一层用于在读取成功后进行回调处理,这也是最常见的异步回调设计方式。

继续跟踪ManagedCursorImpl的数据读取逻辑

    public void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallback callback, Object ctx, PositionImpl maxPosition) {asyncReadEntriesWithSkipOrWait(maxEntries, maxSizeBytes, callback, ctx, maxPosition, null);}public void asyncReadEntriesWithSkipOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallback callback,Object ctx, PositionImpl maxPosition,Predicate<PositionImpl> skipCondition) {....// 读取数据asyncReadEntriesWithSkip(numberOfEntriesToRead, NO_MAX_SIZE_LIMIT, callback, ctx,maxPosition, skipCondition);}public void asyncReadEntriesWithSkip(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback,Object ctx, PositionImpl maxPosition, Predicate<PositionImpl> skipCondition) {// 封装第二层回调OpReadEntry op =OpReadEntry.create(this, readPosition, numOfEntriesToRead, callback, ctx, maxPosition, skipCondition);//核心方法,从这里进去读取ledger.asyncReadEntries(op);
}void asyncReadEntries(OpReadEntry opReadEntry) {....internalReadFromLedger(currentLedger, opReadEntry);....}private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) {....// 进行数据读取asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry, opReadEntry.ctx);}protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, OpReadEntry opReadEntry,Object ctx) {if (config.getReadEntryTimeoutSeconds() > 0) {....// 封装第三层回调ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, ledger.getId(), firstEntry,opReadEntry, readOpCount, createdTime, ctx);lastReadCallback = readCallback;// 尝试从缓存中读取数据,继续跟踪进去entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry.cursor.isCacheReadEntry(),readCallback, readOpCount);} else {entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry.cursor.isCacheReadEntry(), opReadEntry,ctx);}}

entryCache有RangeEntryCacheImpl和EntryCacheDisabled两种实现,EntryCacheDisabled相当于不走缓存直接查Bookkeeper,而RangeEntryCacheImpl是会尝试去读取Broker自身的缓存,这里跟着RangeEntryCacheImpl看看实现

    public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,final ReadEntriesCallback callback, Object ctx) {//跟进去看asyncReadEntry0(lh, firstEntry, lastEntry, shouldCacheEntry, callback, ctx);}void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,final ReadEntriesCallback callback, Object ctx) {//一样,继续跟踪看asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, shouldCacheEntry, callback, ctx, null);}void asyncReadEntry0WithLimits(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,final ReadEntriesCallback originalCallback, Object ctx, InflightReadsLimiter.Handle handle) {....// 缓存实现是ConcurrentSkipListMap value是堆外内存Collection<EntryImpl> cachedEntries = entries.getRange(firstPosition, lastPosition);....//如果全部命中缓存则直接返回,否则往下走// 从bookkeeper读pendingReadsManager.readEntries(lh, firstEntry, lastEntry,shouldCacheEntry, callback, ctx);}void readEntries(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,final AsyncCallbacks.ReadEntriesCallback callback, Object ctx) {....//从Bookkeeper进行数据的读取CompletableFuture<List<EntryImpl>> readResult = rangeEntryCache.readFromStorage(lh, firstEntry,lastEntry, shouldCacheEntry);}CompletableFuture<List<EntryImpl>> readFromStorage(ReadHandle lh,long firstEntry, long lastEntry, boolean shouldCacheEntry) {....//这里的lh其实就是Bookkeeper的客户端对象LedgerHandleCompletableFuture<List<EntryImpl>> readResult = lh.readAsync(firstEntry, lastEntry)....}

到这里基本就到了Bookkeeper的内部逻辑了,Bookkeeper相关的后面在单独进行分析。读取逻辑基本就到这了,肯定会有伙伴疑惑🤔,读到数据后怎么将数据发给客户端/消费者呢?请继续往下看

回调处理

刚刚进行代码跟踪的时候应该都看到流程中封住了好几个回调函数,这里就拎最重要的也就是PersistentDispatcherSingleActiveConsumer进行讨论,这里直接从它的回调方法readEntriesComplete进行跟踪

    public void readEntriesComplete(final List<Entry> entries, Object obj) {//作为任务放到线程池去执行executor.execute(() -> internalReadEntriesComplete(entries, obj));}private synchronized void internalReadEntriesComplete(final List<Entry> entries, Object obj) {....//分派数据到消费者dispatchEntriesToConsumer(currentConsumer, entries, batchSizes, batchIndexesAcks, sendMessageInfo, epoch);}protected void dispatchEntriesToConsumer(Consumer currentConsumer, List<Entry> entries,EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks,SendMessageInfo sendMessageInfo, long epoch) {//将查到的消息通过TCP写到消费者端currentConsumer.sendMessages(entries, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(),redeliveryTracker, epoch)....}public Future<Void> sendMessages(final List<? extends Entry> entries, EntryBatchSizes batchSizes,EntryBatchIndexesAcks batchIndexesAcks,int totalMessages, long totalBytes, long totalChunkedMessages,RedeliveryTracker redeliveryTracker, long epoch) {....//通过PulsarCommandSenderImpl进行消息发送,继续跟踪进去Future<Void> writeAndFlushPromise = cnx.getCommandSender().sendMessagesToConsumer(....);....}public ChannelPromise sendMessagesToConsumer(....) {....//通过Netty的TCP将查到的消息数据写到客户端ctx.write(....);....}

到这里基本上服务端的事情就结束了,剩余的其他几个回调函数感兴趣的伙伴可以自行跟踪。

总结

可以看到Pulsar里大量使用了异步回调处理,这样的设计在高并发场景大幅提升服务的性能,尽可能的避免了存在瓶颈的地方。不过带来的另一影响是,代码跟踪起来相对来说容易“迷路”,因此掌握好异步设计的逻辑是很有必要的,可以帮助我们更好的跟踪Pulsar的代码。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/588363.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

华为交换机配置指引(包含安全配置部分)以 S5735S-L48T4S-A1 配置为例

华为S5735S-L48T4S-A1 是一款千兆以太网交换机: 端口结构: 48个10/100/1000BASE-T以太网端口和4个千兆SFP光接口供电方式: 交流电源背板带宽: 432Gbps包转发率: 87/166Mpps机箱高度: 1U重量: 2.76kg(不含包材)功耗: 典型功耗为43.3W接口: 48个10/100/1000BASE-T以太网电接口…

Incus:新一代容器与虚拟机编排管理引擎

Incus是什么&#xff1f; Incus是一个用于编排管理应用型容器、系统型容器及虚拟机实例的管理工具。它是对 Canonical LXD 的继承与发展&#xff0c;引入了更多的存储驱动支持。 Incus项目的产品地址&#xff1a;Linux Containers - Incus - Introduction 在 LXC-Incus 项目…

FebHost:人工智能时代的新宠儿.AI域名

近年来,人工智能技术在各行各业迅猛发展,正在深刻改变着我们的生活。作为AI领域的专属域名,.AI域名正成为越来越多企业和个人的首选。 那么,.AI域名到底是什么呢?它是一种特殊的顶级域名(Top-Level Domain, TLD),于2013年由 安哥拉政府正式退出。与其他通用顶级域名如.com、.…

QT网络调试助手

QT网络调试助手 1.开发流程 2.QTtcp服务器   1.1 服务端数据读取   1.2 服务端发送数据-所有客户端   1.3 服务端自动刷新ip地址   1.4 服务端检测客户端断开状态   1.5 服务端发送数据-指定特定客户端发送数据   1.6 服务端停止监听和断开 3.QTtcp客户端 1…

Linux学习笔记————C 语言版 LED 灯实验

这里写目录标题 一、实验程序编写二、 汇编部分实验程序编写三、C 语言部分实验程序编写四、编译下载验证 汇编 LED 灯实验中&#xff0c;我们讲解了如何使用汇编来编写 LED 灯驱动&#xff0c;实际工作中是很少用到汇编去写嵌入式驱动的&#xff0c;毕竟汇编太难&#xff0c;而…

股票价格预测 | Python使用BP神经网络和LSTM神经网络预测股票价格

文章目录 效果一览文章概述代码设计BP神经网络LSTM神经网络效果一览 文章概述 BP神经网络使用

css实现更改checkbox的样式;更改checkbox选中后的背景色;更改checkbox选中后的icon

<input class"check-input" type"checkbox"> .check-input {width: 16px;height: 16px;} /* 设置默认的checkbox样式 */input.check-input[type"checkbox"] {-webkit-appearance: none; /* 移除默认样式 */border: 1px solid #999;outl…

【图像分割】nnUnetV1与V2的Linux部署与应用命令

以前觉得麻烦&#xff0c;一直没用过nnunet&#xff0c;虽然知道它很火&#xff0c;最近一个契机&#xff0c;部署使用了一下nnunet&#xff0c;记录一下其部署和使用的方法与命令。 1、部署 首先&#xff0c;我有一个环境&#xff0c;这个环境可以是以前就有的&#xff0c;也可…

Python | Leetcode Python题解之第8题字符串转换整数atoi

题目&#xff1a; 题解&#xff1a; INT_MAX 2 ** 31 - 1 INT_MIN -2 ** 31class Automaton:def __init__(self):self.state startself.sign 1self.ans 0self.table {start: [start, signed, in_number, end],signed: [end, end, in_number, end],in_number: [end, end,…

LabVIEW齿轮箱噪声监测系统

LabVIEW齿轮箱噪声监测系统 齿轮箱作为机械设备的“心脏”&#xff0c;其健康状态对设备的性能有着重要的影响。传统的齿轮箱监测方法依赖于直接的振动信号分析&#xff0c;但这种方法不仅成本高昂&#xff0c;而且在安装和拆卸过程中可能对设备造成损害。针对这些问题&#x…

Aurora8b10b(2)上板验证

文章目录 前言一、AXI_Stream数据产生模块二、上板效果总结 前言 上一篇内容我们已经详细介绍了基于aurora8b10b IP核的设计&#xff0c;本文将基于此进一步完善并且进行上板验证。 设计思路及代码思路参考FPGA奇哥系列网课 一、AXI_Stream数据产生模块 AXIS协议是非常简单的…