【Flink】Flink 的八种分区策略(源码解读)

Flink 的八种分区策略(源码解读)

  • 1.继承关系图
    • 1.1 接口:ChannelSelector
    • 1.2 抽象类:StreamPartitioner
    • 1.3 继承关系图
  • 2.分区策略
    • 2.1 GlobalPartitioner
    • 2.2 ShufflePartitioner
    • 2.3 BroadcastPartitioner
    • 2.4 RebalancePartitioner
    • 2.5 RescalePartitioner
    • 2.6 ForwardPartitioner
    • 2.7 KeyGroupStreamPartitioner
    • 2.8 CustomPartitionerWrapper

Flink 包含 8 种分区策略,这 8 种分区策略(分区器)分别如下面所示,本文将从源码的角度解读每个分区器的实现方式。

在这里插入图片描述

  • GlobalPartitioner
  • ShufflePartitioner
  • RebalancePartitioner
  • RescalePartitioner
  • BroadcastPartitioner
  • ForwardPartitioner
  • KeyGroupStreamPartitioner
  • CustomPartitionerWrapper

1.继承关系图

1.1 接口:ChannelSelector

public interface ChannelSelector<T extends IOReadableWritable> {/*** 初始化channels数量,channel可以理解为下游Operator的某个实例(并行算子的某个subtask).*/void setup(int numberOfChannels);/***根据当前的record以及Channel总数,*决定应将record发送到下游哪个Channel。*不同的分区策略会实现不同的该方法。*/int selectChannel(T record);/***是否以广播的形式发送到下游所有的算子实例*/boolean isBroadcast();
}

1.2 抽象类:StreamPartitioner

public abstract class StreamPartitioner<T> implementsChannelSelector<SerializationDelegate<StreamRecord<T>>>, Serializable {private static final long serialVersionUID = 1L;protected int numberOfChannels;@Overridepublic void setup(int numberOfChannels) {this.numberOfChannels = numberOfChannels;}@Overridepublic boolean isBroadcast() {return false;}public abstract StreamPartitioner<T> copy();
}

1.3 继承关系图

在这里插入图片描述

2.分区策略

2.1 GlobalPartitioner

该分区器会将所有的数据都发送到下游的某个算子实例(subtask id = 0)。

在这里插入图片描述

/*** 发送所有的数据到下游算子的第一个task(ID = 0)* @param <T>*/
@Internal
public class GlobalPartitioner<T> extends StreamPartitioner<T> {private static final long serialVersionUID = 1L;@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {//只返回0,即只发送给下游算子的第一个taskreturn 0;}@Overridepublic StreamPartitioner<T> copy() {return this;}@Overridepublic String toString() {return "GLOBAL";}
}

2.2 ShufflePartitioner

随机选择一个下游算子实例进行发送。

在这里插入图片描述

/*** 随机的选择一个channel进行发送* @param <T>*/
@Internal
public class ShufflePartitioner<T> extends StreamPartitioner<T> {private static final long serialVersionUID = 1L;private Random random = new Random();@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {//产生[0,numberOfChannels)伪随机数,随机发送到下游的某个taskreturn random.nextInt(numberOfChannels);}@Overridepublic StreamPartitioner<T> copy() {return new ShufflePartitioner<T>();}@Overridepublic String toString() {return "SHUFFLE";}
}

2.3 BroadcastPartitioner

发送到下游所有的算子实例。

在这里插入图片描述

/*** 发送到所有的channel*/
@Internal
public class BroadcastPartitioner<T> extends StreamPartitioner<T> {private static final long serialVersionUID = 1L;/*** Broadcast模式是直接发送到下游的所有task,所以不需要通过下面的方法选择发送的通道*/@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {throw new UnsupportedOperationException("Broadcast partitioner does not support select channels.");}@Overridepublic boolean isBroadcast() {return true;}@Overridepublic StreamPartitioner<T> copy() {return this;}@Overridepublic String toString() {return "BROADCAST";}
}

2.4 RebalancePartitioner

通过循环的方式依次发送到下游的 task

在这里插入图片描述

/***通过循环的方式依次发送到下游的task* @param <T>*/
@Internal
public class RebalancePartitioner<T> extends StreamPartitioner<T> {private static final long serialVersionUID = 1L;private int nextChannelToSendTo;@Overridepublic void setup(int numberOfChannels) {super.setup(numberOfChannels);//初始化channel的id,返回[0,numberOfChannels)的伪随机数nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels);}@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {//循环依次发送到下游的task,比如:nextChannelToSendTo初始值为0,numberOfChannels(下游算子的实例个数,并行度)值为2//则第一次发送到ID = 1的task,第二次发送到ID = 0的task,第三次发送到ID = 1的task上...依次类推nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;return nextChannelToSendTo;}public StreamPartitioner<T> copy() {return this;}@Overridepublic String toString() {return "REBALANCE";}
}

2.5 RescalePartitioner

基于上下游 Operator 的并行度,将记录以循环的方式输出到下游 Operator 的每个实例。

举例:

  • 上游并行度是 2,下游是 4,则上游一个并行度以循环的方式将记录输出到下游的两个并行度上;上游另一个并行度以循环的方式将记录输出到下游另两个并行度上。
  • 若上游并行度是 4,下游并行度是 2,则上游两个并行度将记录输出到下游一个并行度上;上游另两个并行度将记录输出到下游另一个并行度上。

在这里插入图片描述

@Internal
public class RescalePartitioner<T> extends StreamPartitioner<T> {private static final long serialVersionUID = 1L;private int nextChannelToSendTo = -1;@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {if (++nextChannelToSendTo >= numberOfChannels) {nextChannelToSendTo = 0;}return nextChannelToSendTo;}public StreamPartitioner<T> copy() {return this;}@Overridepublic String toString() {return "RESCALE";}
}

Flink 中的执行图可以分成四层:StreamGraphJobGraphExecutionGraph物理执行图

  • StreamGraph:是根据用户通过 Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构。
  • JobGraph:StreamGraph 经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化 / 反序列化 / 传输消耗。
  • ExecutionGraph:JobManager 根据 JobGraph 生成 ExecutionGraph。ExecutionGraph 是 JobGraph 的并行化版本,是调度层最核心的数据结构。
  • 物理执行图:JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个 TaskManager 上部署 Task 后形成的 “”,并不是一个具体的数据结构。

StreamingJobGraphGenerator 就是 StreamGraph 转换为 JobGraph。在这个类中,把 ForwardPartitionerRescalePartitioner 列为 POINTWISE 分配模式,其他的为 ALL_TO_ALL 分配模式。代码如下:

if (partitioner instanceof ForwardPartitioner || partitioner instanceof RescalePartitioner) {jobEdge = downStreamVertex.connectNewDataSetAsInput(headVertex,// 上游算子(生产端)的实例(subtask)连接下游算子(消费端)的一个或者多个实例(subtask)DistributionPattern.POINTWISE,resultPartitionType);} else {jobEdge = downStreamVertex.connectNewDataSetAsInput(headVertex,// 上游算子(生产端)的实例(subtask)连接下游算子(消费端)的所有实例(subtask)DistributionPattern.ALL_TO_ALL,resultPartitionType);}

2.6 ForwardPartitioner

发送到下游对应的第一个 task,保证上下游算子并行度一致,即上游算子与下游算子是 1 : 1 1:1 1:1 的关系。

在这里插入图片描述

/*** 发送到下游对应的第一个task* @param <T>*/
@Internal
public class ForwardPartitioner<T> extends StreamPartitioner<T> {private static final long serialVersionUID = 1L;@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {return 0;}public StreamPartitioner<T> copy() {return this;}@Overridepublic String toString() {return "FORWARD";}
}

在上下游的算子没有指定分区器的情况下,如果上下游的算子并行度一致,则使用 ForwardPartitioner,否则使用 RebalancePartitioner,对于 ForwardPartitioner,必须保证上下游算子并行度一致,否则会抛出异常。

//在上下游的算子没有指定分区器的情况下,如果上下游的算子并行度一致,则使用ForwardPartitioner,否则使用RebalancePartitioner
if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {partitioner = new ForwardPartitioner<Object>();
} else if (partitioner == null) {partitioner = new RebalancePartitioner<Object>();
}if (partitioner instanceof ForwardPartitioner) {//如果上下游的并行度不一致,会抛出异常if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {throw new UnsupportedOperationException("Forward partitioning does not allow " +"change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +" You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");}
}

2.7 KeyGroupStreamPartitioner

根据 key 的分组索引选择发送到相对应的下游 subtask

在这里插入图片描述

  • org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner
/*** 根据key的分组索引选择发送到相对应的下游subtask* @param <T>* @param <K>*/
@Internal
public class KeyGroupStreamPartitioner<T, K> extends StreamPartitioner<T> implements ConfigurableStreamPartitioner {
...@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {K key;try {key = keySelector.getKey(record.getInstance().getValue());} catch (Exception e) {throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);}//调用KeyGroupRangeAssignment类的assignKeyToParallelOperator方法,代码如下所示return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);}
...
}
  • org.apache.flink.runtime.state.KeyGroupRangeAssignment
public final class KeyGroupRangeAssignment {
.../*** 根据key分配一个并行算子实例的索引,该索引即为该key要发送的下游算子实例的路由信息,* 即该key发送到哪一个task*/public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {Preconditions.checkNotNull(key, "Assigned key must not be null!");return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));}/***根据key分配一个分组id(keyGroupId)*/public static int assignToKeyGroup(Object key, int maxParallelism) {Preconditions.checkNotNull(key, "Assigned key must not be null!");//获取key的hashcodereturn computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);}/*** 根据key分配一个分组id(keyGroupId),*/public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {//与maxParallelism取余,获取keyGroupIdreturn MathUtils.murmurHash(keyHash) % maxParallelism;}//计算分区index,即该key group应该发送到下游的哪一个算子实例public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {return keyGroupId * parallelism / maxParallelism;}
...

2.8 CustomPartitionerWrapper

通过 Partitioner 实例的 Partition 方法(自定义的)将记录输出到下游。

public class CustomPartitionerWrapper<K, T> extends StreamPartitioner<T> {private static final long serialVersionUID = 1L;Partitioner<K> partitioner;KeySelector<T, K> keySelector;public CustomPartitionerWrapper(Partitioner<K> partitioner, KeySelector<T, K> keySelector) {this.partitioner = partitioner;this.keySelector = keySelector;}@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {K key;try {key = keySelector.getKey(record.getInstance().getValue());} catch (Exception e) {throw new RuntimeException("Could not extract key from " + record.getInstance(), e);}//实现Partitioner接口,重写partition方法return partitioner.partition(key, numberOfChannels);}@Overridepublic StreamPartitioner<T> copy() {return this;}@Overridepublic String toString() {return "CUSTOM";}
}

比如:

public class CustomPartitioner implements Partitioner<String> {// key: 根据key的值来分区// numPartitions: 下游算子并行度@Overridepublic int partition(String key, int numPartitions) {return key.length() % numPartitions;//在此处定义分区策略}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/526081.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

全栈的自我修养 ———— css中常用的布局方法flex和grid

在项目里面有两种常用的主要布局:flex和grid布局&#xff08;b站布局&#xff09;&#xff0c;今天分享给大家这两种的常用的简单方法&#xff01; 一、flex布局1、原图2、中心对齐3、主轴末尾或者开始对其4、互相间隔 二、grid布局1、基本效果2、加间隔3、放大某一个元素 一、…

政安晨:【深度学习处理实践】(四)—— 实施一个温度预测示例

在开始使用像黑盒子一样的深度学习模型解决温度预测问题之前&#xff0c;我们先尝试一种基于常识的简单方法。 它可以作为一种合理性检查&#xff0c;还可以建立一个基准&#xff0c;更高级的机器学习模型需要超越这个基准才能证明其有效性。对于一个尚没有已知解决方案的新问…

HTML 学习笔记(四)图片

<!--通过图片标签"<img src "图片路径">"来调用图片在网页中进行显示--> <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthd…

Linux网络套接字之预备知识

(&#xff61;&#xff65;∀&#xff65;)&#xff89;&#xff9e;嗨&#xff01;你好这里是ky233的主页&#xff1a;这里是ky233的主页&#xff0c;欢迎光临~https://blog.csdn.net/ky233?typeblog 点个关注不迷路⌯▾⌯ 目录 一、预备知识 1.理解源IP地址和目的IP地址 …

Chain of Verification(验证链、CoVe)—理解与实现

原文地址&#xff1a;Chain of Verification (CoVe) — Understanding & Implementation 2023 年 10 月 9 日 GitHub 存储库 介绍 在处理大型语言模型&#xff08;LLM&#xff09;时&#xff0c;一个重大挑战&#xff0c;特别是在事实问答中&#xff0c;是幻觉问题。当答案…

Day27:安全开发-PHP应用TP框架路由访问对象操作内置过滤绕过核心漏洞

目录 TP框架-开发-配置架构&路由&MVC模型 TP框架-安全-不安全写法&版本过滤绕过 思维导图 PHP知识点 功能&#xff1a;新闻列表&#xff0c;会员中心&#xff0c;资源下载&#xff0c;留言版&#xff0c;后台模块&#xff0c;模版引用&#xff0c;框架开发等 技…

时间感知自适应RAG(TA-ARE)

原文地址&#xff1a;Time-Aware Adaptive RAG (TA-ARE) 2024 年 3 月 1 日 介绍 随着大型语言模型&#xff08;LLM&#xff09;的出现&#xff0c;出现了新兴能力的概念。前提或假设是LLMs具有隐藏的和未知的能力&#xff0c;等待被发现。企业家们渴望在LLMs中发现一些无人知晓…

Linux学习:权限

目录 1. shell命令的工作原理与存在意义1.1 shell命令解释器存在的意义1.2 shell解释器的工作原理 2. Linux操作系统&#xff1a;用户2.1 什么是用户2.2 用户的切换操作2.3 用户权限划分的意义 3. Linux中权限的种类和意义3.1 什么是权限3.2 sudo指令与短暂提权 4. 文件类型与文…

Django模型层(附带test环境)

Django模型层(附带test环境) 目录 Django模型层(附带test环境)连接数据库Django ORM在models.py中建表允许为空指定默认值数据库迁移命令 开启测试环境建表语句补充(更改默认表名)数据的增加时间数据的时区 多表数据的增加一对多多对多 数据的删除修改数据查询数据查询所有数据…

【Claude 3】一文谈谈Anthropic(Claude) 亚马逊云科技(Bedrock)的因缘际会

文章目录 前言1. Anthropic的诞生2. Anthropic的“代表作”——Claude 3的“三驾马车”3. 亚马逊云科技介绍4. 强大的全托管服务平台——Amazon Bedrock5. 亚马逊云科技(AWS)和Anthropic的联系6. Claude 3模型与Bedrock托管平台的关系7. Clude 3限时体验入口分享【⚠️截止3月1…

力扣---腐烂的橘子

题目&#xff1a; bfs思路&#xff1a; 感觉bfs还是很容易想到的&#xff0c;首先定义一个双端队列&#xff08;队列也是可以的~&#xff09;&#xff0c;如果值为2&#xff0c;则入队列&#xff0c;我这里将队列中的元素定义为pair<int,int>。第一个int记录在数组中的位…

Diddler抓包工具——学习笔记

F12抓包 302【重定向】&#xff1a;当你发送了一个请求之后&#xff0c;那么这个请求重定向到了另外的资源 跳转和重定向的区别&#xff1a; 跳转是会把数据传到新的地址 重定向不会把新的数据传到新的地址 使用F12抓包时一定要打开Preserve Log开关&#xff0c;作用是保留…