Pulsar消息路由深入剖析

一、概述

大数据背景下,分区应该是所有组件必备的基本条件,否则面对海量数据时无论是计算还是存储都容易遇到瓶颈。跟其他消息系统一样,Pulsar通过Topic将消息数据进行业务层面划分管理,同时也支持Topic分区,通过将多个分区分布在多台Broker/机器上从而带来性能上的巨大提升以及无限的横向拓展能力。而一旦有了分区之后就会面临一个问题,但一条数据请求时应该将其发往哪个分区?目前Pulsar跟其他消息系统一样支持以下三种路由模式。

  1. 轮询路由
    生产者会按将消息按批为单位轮询发送到不同的分区,这是一种常见的路由策略,具有简单的优势,由于它不需要过多的配置以及考虑但却可以表现不错的性能。如果消息带有key的话会根据key进行哈希运算后再对分区进行取模来决定消息投放的目标分区。
  2. 单分区路由
    单分区路由提供一种更简单的机制,它会将所有消息路由到同一个分区。这种模式类似非分区Topic,如果消息提供key的话将恢复到轮询哈希路由方式
  3. 自定义分区路由
    自定义分区路由支持你通过实现MessageRouter接口来自定义路由逻辑,例如将特定key的消息发到指定的分区等

二、实战

消息路由发生在生产者端,在创建生产者是通过 .messageRoutingMode() 进行指定,下面就分别实战对比下这三种的路由效果

1. 轮询路由

先试试轮询路由的策略,这是最常见也是默认的路由策略,通过 .messageRoutingMode(MessageRoutingMode.RoundRobinPartition) 进行指定,然后往里面通过同步方式往分区Topic里面写入数据

        String serverUrl = "http://localhost:8080";PulsarClient pulsarClient =PulsarClient.builder().serviceUrl(serverUrl).build();Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic("sherlock-api-tenant-1/sherlock-namespace-1/partition_partition_topic_2").messageRoutingMode(MessageRoutingMode.RoundRobinPartition)//.messageRoutingMode(MessageRoutingMode.SinglePartition).create();for (int i = 0; i < 20000; i++) {producer.send("hello java API pulsar:"+i+", 当前时间为:"+new Date());}

通过管理页面可以看到数据基本均匀的落在各个分区,从这个结果是能够反向验证数据是符合轮询发送后的效果
在这里插入图片描述

2. 单分区路由

现在试试单分区路由的策略,通过 .messageRoutingMode(MessageRoutingMode.SinglePartition) 进行指定,并往分区Topic里面写入一批数据

        String serverUrl = "http://localhost:8080";PulsarClient pulsarClient =PulsarClient.builder().serviceUrl(serverUrl).build();Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic("sherlock-api-tenant-1/sherlock-namespace-1/partition_partition_topic_2").messageRoutingMode(MessageRoutingMode.SinglePartition).create();for (int i = 0; i < 20000; i++) {producer.send("hello java API pulsar:"+i+", 当前时间为:"+new Date());}

通过管理页面可以看到数据都落在第一个分区,说明这也符合官网中对单分区路由的描述。同时经过反复试验多次发现,生产者会随机选择一个分区并将所有数据发送到这个分区。
在这里插入图片描述

3. 自定义路由

在有些业务场景,我们需要将自己的业务逻辑“融入”路由策略,因此像Pulsar、Kafka等消息中间件都是支持用户进行路由规则的自定义的。这里为了好玩,咱们尝试将数据按照 1:2:3:4 等比例分别落在四个分区如何?说干就干,自定义路由也是比较简单的,只需要实现Pulsar MessageRouter接口的choosePartition方法即可,实现逻辑如下

public class SherlockRouter implements MessageRouter {Long count = 0L;public int choosePartition(Message<?> msg, TopicMetadata metadata) {count++;count = count % 10;if (count == 0) return 0;if (count < 3) return 1;if (count < 6) return 2;return 3;}
}

通过上面代码可以看到,参数msg就是生产者中国呢发送的消息对象,metadata是这条消息的元数据如租户、命名空间等等,而返回值其实就是这个Topic分区的下标,这里需要注意的是不要超过Topic的分区数,同时一些比较复杂的数据处理逻辑代码尽量不要写在这里影响消息发送性能以及不规范。

写完后通过 .messageRouter() 方法进行指定即可使用

    public static void customRoundSchemaProducer() throws Exception {String serverUrl = "http://localhost:8080";PulsarClient pulsarClient =PulsarClient.builder().serviceUrl(serverUrl).build();Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic("sherlock-api-tenant-1/sherlock-namespace-1/partition_partition_topic_3").messageRouter(new SherlockRouter()).create();for (int i = 0; i < 20000; i++) {producer.send("hello java API pulsar:"+i+", 当前时间为:"+new Date());}producer.close();pulsarClient.close();}

在管理页面可以看到,数据是按照咱们预期的逻辑 1:2:3:4等比落在分区里面,嘿嘿~
在这里插入图片描述

三、源码分析

1. 接口以及父类

Pulsar中所有路由规则都是基于MessageRouter接口进行实现的,这个接口主要提供了choosePartition方法,只要重写这个方法即可自定义任意自己预期的逻辑

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface MessageRouter extends Serializable {/**** @param msg*            Message object* @return The index of the partition to use for the message* @deprecated since 1.22.0. Please use {@link #choosePartition(Message, TopicMetadata)} instead.*/@Deprecateddefault int choosePartition(Message<?> msg) {throw new UnsupportedOperationException("Use #choosePartition(Message, TopicMetadata) instead");}/*** Choose a partition based on msg and the topic metadata.** @param msg message to route* @param metadata topic metadata* @return the partition to route the message.* @since 1.22.0*/default int choosePartition(Message<?> msg, TopicMetadata metadata) {return choosePartition(msg);}}

MessageRouterBase是路由策略的抽象类,主要定义了消息有key时的哈希算法,像上面提的轮询路由和单分区路由继承了这个抽象类。JavaStringHash和Murmur3Hash32两个都是Pulsar提供的哈希算法的实现类,两者的差异后面再单独进行分析

public abstract class MessageRouterBase implements MessageRouter {private static final long serialVersionUID = 1L;protected final Hash hash;MessageRouterBase(HashingScheme hashingScheme) {switch (hashingScheme) {case JavaStringHash:this.hash = JavaStringHash.getInstance();break;case Murmur3_32Hash:default:this.hash = Murmur3Hash32.getInstance();}}
}

2. 轮询路由的实现

主要看choosePartition 方法的逻辑,首先如果消息带有key则针对key进行哈希然后取模,这样可以保证相同key的消息落在同一个分区。然后就是判断消息是否按批次进行发送的,如果是单条消息发送的话则通过一个累加计数器进行轮询分区,即可达到消息按照分区顺序逐个发送的效果;如果是按批次发送的话,则是根据时间戳进行取模,这样达到的效果就是每批数据都会随机发送到某一个分区

public class RoundRobinPartitionMessageRouterImpl extends MessageRouterBase {@SuppressWarnings("unused")private volatile int partitionIndex = 0;private final int startPtnIdx;private final boolean isBatchingEnabled;private final long partitionSwitchMs;....@Overridepublic int choosePartition(Message<?> msg, TopicMetadata topicMetadata) {// If the message has a key, it supersedes the round robin routing policyif (msg.hasKey()) {return signSafeMod(hash.makeHash(msg.getKey()), topicMetadata.numPartitions());}if (isBatchingEnabled) { // if batching is enabled, choose partition on `partitionSwitchMs` boundary.long currentMs = clock.millis();return signSafeMod(currentMs / partitionSwitchMs + startPtnIdx, topicMetadata.numPartitions());} else {return signSafeMod(PARTITION_INDEX_UPDATER.getAndIncrement(this), topicMetadata.numPartitions());}}}

3. 单分区路由

可以看到单分区的逻辑是比较简单且清晰的,如果有key就进行哈希取模,否则就发送到partitionIndex这个成员变量指定的分区去,那么这个partitionIndex指定的是哪个分区呢?通过代码能看到是从构造函数里面传进来的,因此跟踪下代码看看

public class SinglePartitionMessageRouterImpl extends MessageRouterBase {private final int partitionIndex;public SinglePartitionMessageRouterImpl(int partitionIndex, HashingScheme hashingScheme) {super(hashingScheme);this.partitionIndex = partitionIndex;}@Overridepublic int choosePartition(Message<?> msg, TopicMetadata metadata) {// If the message has a key, it supersedes the single partition routing policyif (msg.hasKey()) {return signSafeMod(hash.makeHash(msg.getKey()), metadata.numPartitions());}return partitionIndex;}}

通过跟踪可以看到是在PartitionedProducerImpl类的getMessageRouter方法中进行SinglePartitionMessageRouterImpl类的初始化,同时是通过ThreadLocalRandom.current().nextInt(topicMetadata.numPartitions()) 来生成一个小于分区数的随机数,因此单分区路由的分区是随机指定的一个,这个结果跟咱们实战中测试的效果是吻合的。除此之外,咱们还看到 getMessageRouter方法中会根据咱们在创建生产者时 .messageRoutingMode 方法指定的路由模式来创建对应的路由实现类,在这里可以明确的看到没有指定的话默认就是采用的轮询路由规则

private MessageRouter getMessageRouter() {MessageRouter messageRouter;MessageRoutingMode messageRouteMode = conf.getMessageRoutingMode();switch (messageRouteMode) {case CustomPartition:messageRouter = Objects.requireNonNull(conf.getCustomMessageRouter());break;case SinglePartition:messageRouter = new SinglePartitionMessageRouterImpl(ThreadLocalRandom.current().nextInt(topicMetadata.numPartitions()), conf.getHashingScheme());break;case RoundRobinPartition:default:messageRouter = new RoundRobinPartitionMessageRouterImpl(conf.getHashingScheme(),ThreadLocalRandom.current().nextInt(topicMetadata.numPartitions()),conf.isBatchingEnabled(),TimeUnit.MICROSECONDS.toMillis(conf.batchingPartitionSwitchFrequencyIntervalMicros()));}return messageRouter;}

四、总结

通过以上内容相信你对Pulsar的路由规则有一定的了解了,如果想进一步了解可以尝试按照自己喜好实现下路由规则并观测是否按照预期运行,同时也可以跟踪Pulsar的源码看看实现是否符合预期。如果想彻底掌握Pulsar,最好自己跟踪下Pulsar的一些核心逻辑,这样不仅了解其底层是如何运作的,也能加深你对一些设计/特性的印象。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/537142.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

何为一致性哈希?一致性哈希和哈希有什么区别?一文深入理解一致性哈希

目录 一、前言二、哈希2.1、哈希碰撞2.2、针对哈希碰撞的两种方案2.3、为什么用哈希2.4、普通哈希的缺点 三、一致性哈希3.1、实现方式 - 哈希环3.2、场景复现3.3、优化版本的哈希环 四、总结 一、前言 在学到分布式负载均衡时&#xff0c;负载均衡的方式有很多种&#xff0c;…

*地宫取宝c++

题目 输入样例1&#xff1a; 2 2 2 1 2 2 1输出样例1&#xff1a; 2输入样例2&#xff1a; 2 3 2 1 2 3 2 1 5输出样例2&#xff1a; 14 思路 题目说从入口开始&#xff0c;只能向右或向下行走到达右下角&#xff0c;类似“摘花生”这道题的模型。题目又说只有当格子里的宝…

数据结构知识点总结00-知识点目录

专栏主页&#xff1a; 数据结构算法程序设计基础C语言知识点总结https://blog.csdn.net/seeker1994/category_12585732.html C语言知识点总结00-C语言知识点目录 最优算法100例00-最优算法100例目录 ...... 数据结构知识点目录 要求&#xff1a; &#xff08;1&#xff…

Devin,第一位AI软件工程师

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

微信小程序上传图片到服务端,springboot项目。避免踩坑保姆教程

多方查找终于搞懂了如何去上传文件到本地服务器 前端代码 <view class"operation_row common_mb0"><view class"upload_btn" bindtap"clickUpload"><image src"../../common/images/icon/icon02.png"></image&g…

跨境电商怎么使用动态住宅代理IP?

在数字化时代&#xff0c;隐私保护和信息安全成为全球网民的共同关切。特别是对于海外用户&#xff0c;由于地理位置和网络监管政策的不同&#xff0c;访问全球信息资源变得更加复杂。使用动态住宅IP搭建代理&#xff0c;作为解决这一问题的有效手段&#xff0c;动态IP代理通过…

qiankun:vite/webpack项目配置

相关博文&#xff1a; https://juejin.cn/post/7216536069285429285?searchId202403091501088BACFF113F980BA3B5F3 https://www.bilibili.com/video/BV12T411q7dq/?spm_id_from333.337.search-card.all.click qiankun结构&#xff1a; 主应用base&#xff1a;vue3historyv…

Vue3基础速成

Vue常用语法 {{ }} 变量、表达式渲染 {{ }} 用于输出对象属性和函数返回值 <div id"hello-vue" class"demo">{{ message }} </div><script>const HelloVueApp {data() {return {message: Hello Vue!!}}}Vue.createApp(HelloVueApp).…

Pytorch学习 day13(完整的模型训练步骤)

步骤一&#xff1a;定义神经网络结构 注意&#xff1a;由于一次batch_size的大小为64&#xff0c;表示一次放入64张图片&#xff0c;且Flatten()只会对单张图片的全部通道做拉直操作&#xff0c;也就是不会将batch_size合并&#xff0c;但是一张图片有3个通道&#xff0c;在Ma…

【网络安全】 MSF生成木马教程

本文章仅用于信息安全学习&#xff0c;请遵守相关法律法规&#xff0c;严禁用于非法途径。若读者因此作出任何危害网络安全的行为&#xff0c;后果自负&#xff0c;与作者无关。 环境准备&#xff1a; 名称系统IP攻击机Kali Linux10.3.0.231客户端Windows 710.3.0.234 一、生…

设计模式-行为型模式-模版方法模式

模板方法模式&#xff0c;定义一个操作中的算法的骨架&#xff0c;而将一些步骤延迟到子类中。模板方法使得子类可以不改变一个算法的结构即可重定义该算法的某些特定步骤。[DP] 模板方法模式是通过把不变行为搬移到超类&#xff0c;去除子类中的重复代码来体现它的优势。 //首…

在Linux/Ubuntu/Debian中使用windows应用程序/软件

Wine 是一个兼容层&#xff0c;允许你在类 Unix 操作系统&#xff08;包括 Ubuntu&#xff09;上运行 Windows 应用程序。 以下是在 Ubuntu 上安装和使用 Wine 的基本步骤&#xff1a; 在 Ubuntu 上安装 Wine&#xff1a; 更新软件包列表&#xff1a; 打开终端并运行以下命令以…