Flink之Partitioner(分区规则)

Flink之Partitioner(分区规则)

方法注释
global()全部发往1个task
broadcast()广播(前面的文章讲解过,这里不做阐述)
forward()上下游并行度一致时一对一发送,和同一个算子连中算子的OneToOne是一回事
shuffle()随机分配(只是随机,同Spark的shuffle不同)
rebalance()轮询分配,默认机制就是rebalance()
recale()一般是下游task是上游task的并行度的倍数时,在生成job时,会将下游中的某几个subtask和上游的某个subtask绑成一组,然后在组内上游subtask以轮询的方式将数据发送给下游的subtask.
partitionCustom自定义分区器(这里不做演示,后续会单独写一个自定义分区器的内容)
keyBy()根据数据key的HashCode进行Hash分配
  • global

    global在实际业务场景中使用的不是很多,一般都是需要全局数据汇总的时候才会用到.global就是将上游的数据全部发往下游的第一个subtask中,也就是说下游设置再多的并行度是没意义的,所以使用global的时候,下游的task的并行度都是1.
    在这里插入图片描述
    这里结合代码看一下:

    public class FlinkPartitioner {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();conf.setInteger("rest.port", 8081);// 开启本地WebUI,构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);// 添加数据源,SocketDataStreamSource<String> sourceStream = env.socketTextStream("localhost", 9999);// 转大写,设置并行度为3,且设置数据分区方式为globalDataStream<String> upperCaseMapStream = sourceStream.map(s -> s.toUpperCase()).setParallelism(3).global();// 切分字符串,设置并行度为1SingleOutputStreamOperator<String> splitFlatMapStream = upperCaseMapStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] split = value.split(",");for (String s : split) {out.collect(s);}}}).setParallelism(1);//......env.execute("Flink Partitioner");}
    }
    

    WebUI界面查看代码中upperCaseMapStreamsplitFlatMapStream之间数据的发送方式
    在这里插入图片描述

  • forward

    forward其实就是一对一发送数据,和之前讲解Task的文章中提到的算子之间OneToOne的模式是一样的,就是可以将forward理解为同一个task chain[算子链]中算子之间的数据传输方式,但是使用forward的前提是上下游的算子并行度是一致的也就是上下游的subtask数量保持一致,图解如下:
    在这里插入图片描述

    代码内容如下:

    public class FlinkPartitioner {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();conf.setInteger("rest.port", 8081);// 开启本地WebUI,构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);// 添加数据源,SocketDataStreamSource<String> sourceStream = env.socketTextStream("localhost", 9999);// 转大写,设置为forward分区方式DataStream<String> upperCaseMapStream = sourceStream.map(s -> s.toUpperCase()).setParallelism(3).forward();// 切分字符串SingleOutputStreamOperator<String> splitFlatMapStream = upperCaseMapStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] split = value.split(",");for (String s : split) {out.collect(s);}}}).setParallelism(3).startNewChain(); // 这里加上.startNewChain是为了在WebUI能看到效果,因为upperCaseMapStream和splitFlatMapStream的并行度是一致的,不加startNewChain默认的机制会将两者划分到同一个算子链中,就看不到实际的效果了.// ...env.execute("Flink Partitioner");}
    }
    

    WebUI界面查看upperCaseMapStreamsplitFlatMapStream的数据发送方式,如下:
    在这里插入图片描述

  • shuffle

    通过前面WebUI的图片我们可以看到,从Socket数据源将数据发送到第一个map的时候使用的是默认的rebalance方式,也就是轮询发送的方式,而这里说的shuffle虽然也是一对多的发送方式,但是发送数据时是随机的,举个例子,上游有3subtask,下游有5subtask,数据源有5条数据,上游中的某一个subtask向下游发送数据时,是随机发送的,下游的5subtask并不是每个都一定能接受到数据,可能有的接收到1条,有的接收到2条,有的接收到3条数据,这就是shuffle发送数据的方式.

    如果说上两个operator并行度一致,上游选择了shuffle发送数据的方式,那么两个operator会绑定成一个task chain么?不会,因为shuffle的数据发送方式就已经导致两个operator不是OneToOne的模式了.
    在这里插入图片描述
    代码示例:

    public class FlinkPartitioner {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();conf.setInteger("rest.port", 8081);// 开启本地WebUI,构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);// 添加数据源,SocketDataStreamSource<String> sourceStream = env.socketTextStream("localhost", 9999);// 转大写,设置为shuffle分区方式DataStream<String> upperCaseMapStream = sourceStream.map(s -> s.toUpperCase()).setParallelism(3).shuffle();// 切分字符串SingleOutputStreamOperator<String> splitFlatMapStream = upperCaseMapStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] split = value.split(",");for (String s : split) {out.collect(s);}}}).setParallelism(7)// ...env.execute("Flink Partitioner");}
    }
    

    WebUI界面查看upperCaseMapStreamsplitFlatMapStream的数据发送方式,如下:
    在这里插入图片描述

  • Rebalance

    rebalance就是Flink默认的数据分发机制,直白的讲就是给每个小朋友一人一块糖果,直到发完为止,不偏不倚,这个不细说了,没什么可讲的.
    在这里插入图片描述

  • recale

    关于recale前面说到了是组内的方式进行轮询分发数据,这里就以图解的方式进行讲解,便于理解.

    Flink任务启动时,如果发现上下游中使用了recale分发数据的方式就会将上下游的subtask进行分组绑定,如上游有2个subtask,下游有四个subtask,就会将上游的一个subtask和下游的两个subtask进行绑定,如下图:
    在这里插入图片描述

    当上下游对应的subtask分组后,上下游组内的subtak就会以组内轮询的方式发送数据,如下图:
    在这里插入图片描述

  • keyBy

    keyBy使用的HASH分区方式,实际是hashCode() + murmurHash()的组合方式,这个在源码的KeyGroupRangeAssignment类中是可以看到的,简单来说根据keyhash值模除以下游的最大并行度(return MathUtils.murmurHash(keyHash) % maxParallelism;).

    关于keyBy的使用应该都很熟悉了,这里直接给大家看演示结果吧,如下图:
    在这里插入图片描述

以上就是对Flink中分区规则的讲解.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/69912.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

信息与通信工程面试准备——数学知识|正态分布|中心极限定理

目录 正态分布 正态分布的参数 正态分布的第一个参数是均值 正态分布的第二个参数是标准差SD 所有正态分布的共同特征 标准正态分布&#xff1a;正态分布的特例 中心极限定理 理解定义 示例# 1 示例# 2 知道样本均值总是正态分布的实际含义是什么&#xff1f; 正态分…

pytorch_lightning报错 You requested gpu: [1],But your machine only has: [0]

pytorch_lightning报错 You requested gpu: [1]&#xff0c;But your machine only has: [0] 问题及分析 报错图片如下&#xff1a; 分析 gpu:[1]指代的gpu的标号&#xff0c;如果笔记本中只包含一个GPU&#xff0c;一般序号为[0].所以无法找到程序指定的GPU。 解决方法 …

语聚AI公测发布,大语言模型时代下新的生产力工具

语聚AI 公测发布 距离语聚AI内测上线已经过去近1个月。 这期间&#xff0c;我们共邀请了近百位资深用户与行业专家加入语聚AI产品体验。通过大家的热情参与积极反馈&#xff0c;我们不断优化并完善了语聚AI的功能与使用体验。 经过研发团队不懈的努力&#xff0c;今天语聚AI终…

Java数字化智慧工地管理云平台源码(人工智能、物联网、大数据)

智慧工地优势&#xff1a;"智慧工地”将施工企业现场视频管理、建筑起重机械安全监控、现场从业人员管理、物料管理、进度管理、扬尘噪声监测等现场设备有机、高效、科学、规范的结合起来真正实现工程项目业务流与现场各类监控源数据流的有效结合与深度配合&#xff0c;实…

【等保测评】等保初级测评师试题合集(3w字汇总)

【等保测评】信息安全等级保护初级测评师试题合集 一、法律法规单选多选判断 二、实施指南单选多选 三、定级指南四、基本要求五、测评准则六、信息安全等级测评模拟模拟试题1一、单选二、多选三、判断四、简答 模拟试题2一、单选二、多选三、判断四、简答 模拟试题3一、单选二…

深度学习实战48-【未来的专家团队】基于AutoCompany模型的自动化企业概念设计与设想

大家好,我是微学AI,今天给大家介绍一下深度学习实战48-【未来的专家团队】基于AutoCompany模型的自动化企业概念设计与设想,文本将介绍AutoCompany模型的概念设计,涵盖了AI智能公司的各个角色,并结合了GPT-4接口来实现各个角色的功能,设置中央控制器,公司运作过程会生成…

【git clone error:no matching key exchange method found】

拉起项目代码报错 git clone ssh://uidxxxgerrit-xxxxxxxx Cloning into ‘xxxxx’… Unable to negotiate with xxx.xx.xxx.ip port xxxxx: no matching key exchange method found. Their offer: diffie-hellman-group14-sha1,diffie-hellman-group1-sha1 fatal: Could not …

通用与垂直大模型之战:大模型驱动的商业智能变革之路

科技云报道原创。 是做通用大模型还是垂直大模型&#xff0c;这一个争论在“百模大战”的下讨论愈发热烈。 目前&#xff0c;以微软、谷歌、百度、阿里等为代表的发力于通用大模型的科技大厂&#xff0c;也都开始推动大模型在垂直领域的商业化落地。 比如说&#xff0c;微软…

【TI-CCS笔记】工程编译配置 bin文件的编译和生成 各种架构的Post-build配置汇总

【TI-CCS笔记】工程编译配置 bin文件的编译和生成 各种架构的Post-build配置汇总 TI编译器分类 在CCS按照目录下 有个名为${CG_TOOL_ROOT}的目录 其下就是当前工程的编译器 存放目录为&#xff1a; C:\ti\ccs1240\ccs\tools\compiler按类型分为五种&#xff1a; ti-cgt-arm…

软件测试常用工具总结(测试管理、单元测试、接口测试、自动化测试、性能测试、负载测试等)

前言 在软件测试的过程中&#xff0c;多多少少都是会接触到一些测试工具&#xff0c;作为辅助测试用的&#xff0c;以提高测试工作的效率&#xff0c;使用好了测试工具&#xff0c;能对测试起到一个很好的作用&#xff0c;同时&#xff0c;有些公司&#xff0c;也会要求掌握一…

在Orangepi5开发板3588s使用opencv获取摄像头画面

先感谢香橙派群的管理员耐心指导&#xff0c;经过不断的调试修改最后成功通过opencv调用mipi摄像头获取画面 就记录分享一下大概步骤希望大家少踩点坑&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01; 我用的固件系统是ubuntu2022.0.4 固件是&#x…