Flink StreamGraph生成过程

文章目录

    • 概要
    • SteramGraph 核心对象
    • SteramGraph 生成过程

概要

在 Flink 中,StreamGraph 是数据流的逻辑表示,它描述了如何在 Flink 作业中执行数据流转换。StreamGraph 是 Flink 运行时生成执行计划的基础。
在这里插入图片描述
使用DataStream API开发的应用程序,首先被转换为 Transformation,再被映射为StreamGraph,在客户端进行StreamGraph、JobGraph的转换,提交JobGraph到Flink集群后,Flink集群负责将JobGraph转换为ExecutionGraph,之后进入调度执行阶段。

SteramGraph 核心对象

  • StreamNode
    StreamNode 是 StremGraph 中的节点 ,从 Transformation 转换而来,可以简单理解为一个 StreamNode 表示一个算子,从逻辑上来说,SteramNode 在 StreamGraph 中存在实体和虚拟的 StreamNode。StremNode 可以有多个输入,也可以有多个输出。
    实体的 StreamNode 会最终变成物理算子。虚拟的 StreamNode 会附着在 StreamEdge 上。
  • StreamEdge
    StreamEdge 是 StreamGraph 中的边,用来连接两个 StreamNode,一个 StreamNode 可以有多个出边、入边,StreamEdge 中包含了旁路输出、分区器、字段筛选输出等信息。

SteramGraph 生成过程

StreamGraph 在 FlinkClient 中生成,由 FlinkClient 在提交的时候触发 Flink 应用的 main 方法,用户编写的业务逻辑组装成 Transformation 流水线,在最后调用 StreamExecutionEnvironment.execute() 的时候开始触发 StreamGraph 构建。
StreamGraph在Flink的作业提交前生成,生成StreamGraph的入口在StreamExecutionEnvironment中

    @Internalpublic StreamGraph getStreamGraph() {return this.getStreamGraph(this.getJobName());}@Internalpublic StreamGraph getStreamGraph(String jobName) {return this.getStreamGraph(jobName, true);}@Internalpublic StreamGraph getStreamGraph(String jobName, boolean clearTransformations) {StreamGraph streamGraph = this.getStreamGraphGenerator().setJobName(jobName).generate();if (clearTransformations) {this.transformations.clear();}return streamGraph;}private StreamGraphGenerator getStreamGraphGenerator() {if (this.transformations.size() <= 0) {throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");} else {RuntimeExecutionMode executionMode = (RuntimeExecutionMode)this.configuration.get(ExecutionOptions.RUNTIME_MODE);return (new StreamGraphGenerator(this.transformations, this.config, this.checkpointCfg, this.getConfiguration())).setRuntimeExecutionMode(executionMode).setStateBackend(this.defaultStateBackend).setChaining(this.isChainingEnabled).setUserArtifacts(this.cacheFile).setTimeCharacteristic(this.timeCharacteristic).setDefaultBufferTimeout(this.bufferTimeout);}}

StreamGraph实际上是在StreamGraphGenerator中生成的,从SinkTransformation(输出向前追溯到SourceTransformation)。在遍历过程中一边遍历一遍构建StreamGraph,如代码清单所示


@Internal
public class StreamGraphGenerator {private final List<Transformation<?>> transformations;private StateBackend stateBackend;private static final Map<Class<? extends Transformation>, TransformationTranslator<?, ? extends Transformation>> translatorMap;protected static Integer iterationIdCounter;private StreamGraph streamGraph;private Map<Transformation<?>, Collection<Integer>> alreadyTransformed;public StreamGraphGenerator(List<Transformation<?>> transformations, ExecutionConfig executionConfig, CheckpointConfig checkpointConfig) {this(transformations, executionConfig, checkpointConfig, new Configuration());}public StreamGraphGenerator(List<Transformation<?>> transformations, ExecutionConfig executionConfig, CheckpointConfig checkpointConfig, ReadableConfig configuration) {this.chaining = true;this.timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC;this.jobName = "Flink Streaming Job";this.savepointRestoreSettings = SavepointRestoreSettings.none();this.defaultBufferTimeout = -1L;this.runtimeExecutionMode = RuntimeExecutionMode.STREAMING;this.transformations = (List)Preconditions.checkNotNull(transformations);this.executionConfig = (ExecutionConfig)Preconditions.checkNotNull(executionConfig);this.checkpointConfig = new CheckpointConfig(checkpointConfig);this.configuration = (ReadableConfig)Preconditions.checkNotNull(configuration);}public StreamGraph generate() {this.streamGraph = new StreamGraph(this.executionConfig, this.checkpointConfig, this.savepointRestoreSettings);this.shouldExecuteInBatchMode = this.shouldExecuteInBatchMode(this.runtimeExecutionMode);this.configureStreamGraph(this.streamGraph);this.alreadyTransformed = new HashMap();Iterator var1 = this.transformations.iterator();while(var1.hasNext()) {Transformation<?> transformation = (Transformation)var1.next();this.transform(transformation);}StreamGraph builtStreamGraph = this.streamGraph;this.alreadyTransformed.clear();this.alreadyTransformed = null;this.streamGraph = null;return builtStreamGraph;}private Collection<Integer> transform(Transformation<?> transform) {if (this.alreadyTransformed.containsKey(transform)) {return (Collection)this.alreadyTransformed.get(transform);} else {LOG.debug("Transforming " + transform);if (transform.getMaxParallelism() <= 0) {int globalMaxParallelismFromConfig = this.executionConfig.getMaxParallelism();if (globalMaxParallelismFromConfig > 0) {transform.setMaxParallelism(globalMaxParallelismFromConfig);}}transform.getOutputType();TransformationTranslator<?, Transformation<?>> translator = (TransformationTranslator)translatorMap.get(transform.getClass());Collection transformedIds;if (translator != null) {transformedIds = this.translate(translator, transform);} else {transformedIds = this.legacyTransform(transform);}if (!this.alreadyTransformed.containsKey(transform)) {this.alreadyTransformed.put(transform, transformedIds);}return transformedIds;}}private Collection<Integer> legacyTransform(Transformation<?> transform) {Collection transformedIds;if (transform instanceof FeedbackTransformation) {transformedIds = this.transformFeedback((FeedbackTransformation)transform);} else {if (!(transform instanceof CoFeedbackTransformation)) {throw new IllegalStateException("Unknown transformation: " + transform);}transformedIds = this.transformCoFeedback((CoFeedbackTransformation)transform);}if (transform.getBufferTimeout() >= 0L) {this.streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());} else {this.streamGraph.setBufferTimeout(transform.getId(), this.defaultBufferTimeout);}if (transform.getUid() != null) {this.streamGraph.setTransformationUID(transform.getId(), transform.getUid());}if (transform.getUserProvidedNodeHash() != null) {this.streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());}if (!this.streamGraph.getExecutionConfig().hasAutoGeneratedUIDsEnabled() && transform instanceof PhysicalTransformation && transform.getUserProvidedNodeHash() == null && transform.getUid() == null) {throw new IllegalStateException("Auto generated UIDs have been disabled but no UID or hash has been assigned to operator " + transform.getName());} else {if (transform.getMinResources() != null && transform.getPreferredResources() != null) {this.streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());}this.streamGraph.setManagedMemoryUseCaseWeights(transform.getId(), transform.getManagedMemoryOperatorScopeUseCaseWeights(), transform.getManagedMemorySlotScopeUseCases());return transformedIds;}}private <T> Collection<Integer> transformFeedback(FeedbackTransformation<T> iterate) {if (this.shouldExecuteInBatchMode) {throw new UnsupportedOperationException("Iterations are not supported in BATCH execution mode. If you want to execute such a pipeline, please set the '" + ExecutionOptions.RUNTIME_MODE.key() + "'=" + RuntimeExecutionMode.STREAMING.name());} else if (iterate.getFeedbackEdges().size() <= 0) {throw new IllegalStateException("Iteration " + iterate + " does not have any feedback edges.");} else {List<Transformation<?>> inputs = iterate.getInputs();Preconditions.checkState(inputs.size() == 1);Transformation<?> input = (Transformation)inputs.get(0);List<Integer> resultIds = new ArrayList();Collection<Integer> inputIds = this.transform(input);resultIds.addAll(inputIds);if (this.alreadyTransformed.containsKey(iterate)) {return (Collection)this.alreadyTransformed.get(iterate);} else {Tuple2<StreamNode, StreamNode> itSourceAndSink = this.streamGraph.createIterationSourceAndSink(iterate.getId(), getNewIterationNodeId(), getNewIterationNodeId(), iterate.getWaitTime(), iterate.getParallelism(), iterate.getMaxParallelism(), iterate.getMinResources(), iterate.getPreferredResources());StreamNode itSource = (StreamNode)itSourceAndSink.f0;StreamNode itSink = (StreamNode)itSourceAndSink.f1;this.streamGraph.setSerializers(itSource.getId(), (TypeSerializer)null, (TypeSerializer)null, iterate.getOutputType().createSerializer(this.executionConfig));this.streamGraph.setSerializers(itSink.getId(), iterate.getOutputType().createSerializer(this.executionConfig), (TypeSerializer)null, (TypeSerializer)null);resultIds.add(itSource.getId());this.alreadyTransformed.put(iterate, resultIds);List<Integer> allFeedbackIds = new ArrayList();Iterator var10 = iterate.getFeedbackEdges().iterator();while(var10.hasNext()) {Transformation<T> feedbackEdge = (Transformation)var10.next();Collection<Integer> feedbackIds = this.transform(feedbackEdge);allFeedbackIds.addAll(feedbackIds);Iterator var13 = feedbackIds.iterator();while(var13.hasNext()) {Integer feedbackId = (Integer)var13.next();this.streamGraph.addEdge(feedbackId, itSink.getId(), 0);}}String slotSharingGroup = this.determineSlotSharingGroup((String)null, allFeedbackIds);if (slotSharingGroup == null) {slotSharingGroup = "SlotSharingGroup-" + iterate.getId();}itSink.setSlotSharingGroup(slotSharingGroup);itSource.setSlotSharingGroup(slotSharingGroup);return resultIds;}}}private <F> Collection<Integer> transformCoFeedback(CoFeedbackTransformation<F> coIterate) {if (this.shouldExecuteInBatchMode) {throw new UnsupportedOperationException("Iterations are not supported in BATCH execution mode. If you want to execute such a pipeline, please set the '" + ExecutionOptions.RUNTIME_MODE.key() + "'=" + RuntimeExecutionMode.STREAMING.name());} else {Tuple2<StreamNode, StreamNode> itSourceAndSink = this.streamGraph.createIterationSourceAndSink(coIterate.getId(), getNewIterationNodeId(), getNewIterationNodeId(), coIterate.getWaitTime(), coIterate.getParallelism(), coIterate.getMaxParallelism(), coIterate.getMinResources(), coIterate.getPreferredResources());StreamNode itSource = (StreamNode)itSourceAndSink.f0;StreamNode itSink = (StreamNode)itSourceAndSink.f1;this.streamGraph.setSerializers(itSource.getId(), (TypeSerializer)null, (TypeSerializer)null, coIterate.getOutputType().createSerializer(this.executionConfig));this.streamGraph.setSerializers(itSink.getId(), coIterate.getOutputType().createSerializer(this.executionConfig), (TypeSerializer)null, (TypeSerializer)null);Collection<Integer> resultIds = Collections.singleton(itSource.getId());this.alreadyTransformed.put(coIterate, resultIds);List<Integer> allFeedbackIds = new ArrayList();Iterator var7 = coIterate.getFeedbackEdges().iterator();while(var7.hasNext()) {Transformation<F> feedbackEdge = (Transformation)var7.next();Collection<Integer> feedbackIds = this.transform(feedbackEdge);allFeedbackIds.addAll(feedbackIds);Iterator var10 = feedbackIds.iterator();while(var10.hasNext()) {Integer feedbackId = (Integer)var10.next();this.streamGraph.addEdge(feedbackId, itSink.getId(), 0);}}String slotSharingGroup = this.determineSlotSharingGroup((String)null, allFeedbackIds);itSink.setSlotSharingGroup(slotSharingGroup);itSource.setSlotSharingGroup(slotSharingGroup);return Collections.singleton(itSource.getId());}}private Collection<Integer> translate(TransformationTranslator<?, Transformation<?>> translator, Transformation<?> transform) {Preconditions.checkNotNull(translator);Preconditions.checkNotNull(transform);List<Collection<Integer>> allInputIds = this.getParentInputIds(transform.getInputs());if (this.alreadyTransformed.containsKey(transform)) {return (Collection)this.alreadyTransformed.get(transform);} else {String slotSharingGroup = this.determineSlotSharingGroup(transform.getSlotSharingGroup(), (Collection)allInputIds.stream().flatMap(Collection::stream).collect(Collectors.toList()));Context context = new StreamGraphGenerator.ContextImpl(this, this.streamGraph, slotSharingGroup, this.configuration);return this.shouldExecuteInBatchMode ? translator.translateForBatch(transform, context) : translator.translateForStreaming(transform, context);}}private List<Collection<Integer>> getParentInputIds(@Nullable Collection<Transformation<?>> parentTransformations) {List<Collection<Integer>> allInputIds = new ArrayList();if (parentTransformations == null) {return allInputIds;} else {Iterator var3 = parentTransformations.iterator();while(var3.hasNext()) {Transformation<?> transformation = (Transformation)var3.next();allInputIds.add(this.transform(transformation));}return allInputIds;}}private String determineSlotSharingGroup(String specifiedGroup, Collection<Integer> inputIds) {if (specifiedGroup != null) {return specifiedGroup;} else {String inputGroup = null;Iterator var4 = inputIds.iterator();while(var4.hasNext()) {int id = (Integer)var4.next();String inputGroupCandidate = this.streamGraph.getSlotSharingGroup(id);if (inputGroup == null) {inputGroup = inputGroupCandidate;} else if (!inputGroup.equals(inputGroupCandidate)) {return "default";}}return inputGroup == null ? "default" : inputGroup;}}static {DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;Map<Class<? extends Transformation>, TransformationTranslator<?, ? extends Transformation>> tmp = new HashMap();tmp.put(OneInputTransformation.class, new OneInputTransformationTranslator());tmp.put(TwoInputTransformation.class, new TwoInputTransformationTranslator());tmp.put(MultipleInputTransformation.class, new MultiInputTransformationTranslator());tmp.put(KeyedMultipleInputTransformation.class, new MultiInputTransformationTranslator());tmp.put(SourceTransformation.class, new SourceTransformationTranslator());tmp.put(SinkTransformation.class, new SinkTransformationTranslator());tmp.put(LegacySinkTransformation.class, new LegacySinkTransformationTranslator());tmp.put(LegacySourceTransformation.class, new LegacySourceTransformationTranslator());tmp.put(UnionTransformation.class, new UnionTransformationTranslator());tmp.put(PartitionTransformation.class, new PartitionTransformationTranslator());tmp.put(SideOutputTransformation.class, new SideOutputTransformationTranslator());tmp.put(ReduceTransformation.class, new ReduceTransformationTranslator());tmp.put(TimestampsAndWatermarksTransformation.class, new TimestampsAndWatermarksTransformationTranslator());tmp.put(BroadcastStateTransformation.class, new BroadcastStateTransformationTranslator());tmp.put(KeyedBroadcastStateTransformation.class, new KeyedBroadcastStateTransformationTranslator());translatorMap = Collections.unmodifiableMap(tmp);iterationIdCounter = 0;} 
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/518280.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SQL server内存问题排查方案

前言 由于昨晚线上服务器数据库突然访问数据缓慢&#xff0c;任务管理里面SQL server进程爆满等等&#xff0c;重大事故的排查拟写解决方案。 整体思路 查询数据库请求连接&#xff1a;排查连接池是否占满查询数据库请求量&#xff1a;排查数据是否存在反复查询查询数据库阻…

【Hadoop大数据技术】——HDFS分布式文件系统(学习笔记)

&#x1f4d6; 前言&#xff1a;Hadoop的核心是HDFS&#xff08;Hadoop Distributed File System&#xff0c;Hadoop分布式文件系统&#xff09;和MapReduce。其中&#xff0c;HDFS是解决海量大数据文件存储的问题&#xff0c;是目前应用最广泛的分布式文件系统。 目录 &#x…

C++复习笔记——泛型编程模板

01 模板 模板就是建立通用的模具&#xff0c;大大提高复用性&#xff1b; 02 函数模板 C另一种编程思想称为 泛型编程 &#xff0c;主要利用的技术就是模板 C 提供两种模板机制:函数模板和类模板 函数模板语法 函数模板作用&#xff1a; 建立一个通用函数&#xff0c;其函…

NextJs教程系列(一):介绍安装

什么是 Next.js Next.js 是一个用于构建全栈 Web 应用程序的 React 框架。您可以使用 React 组件来构建用户界面&#xff0c;并使用 Next.js 来构建其他功能和优化。 Next.js 的特点 构建全栈 Web 应用程序的 React 框架。为 React 提供了开箱即用的服务器端渲染。为 React …

基于java SSM的房屋租赁系统设计和实现

基于java SSM的房屋租赁系统设计和实现 博主介绍&#xff1a;多年java开发经验&#xff0c;专注Java开发、定制、远程、文档编写指导等,csdn特邀作者、专注于Java技术领域 作者主页 央顺技术团队 Java毕设项目精品实战案例《500套》 欢迎点赞 收藏 ⭐留言 文末获取源码联系方式…

使用Jenkins CI/CD和Gitee webhooks发布前端自定义组件库到npm

通过之前的学习&#xff0c;沉淀出了一套自定义公共组件库&#xff0c;现在要实现将其通过Jenkins的CICD 推送代码到npm上 一、配置npm &#xff08;服务器命令行窗口上配置&#xff09; 1.设置官方网址 npm config set registry https://registry.npmjs.org/2.登录&#xf…

智慧城市中的数据力量:大数据与AI的应用

目录 一、引言 二、大数据与AI技术的融合 三、大数据与AI在智慧城市中的应用 1、智慧交通 2、智慧环保 3、智慧公共安全 4、智慧公共服务 四、大数据与AI在智慧城市中的价值 1、提高城市管理的效率和水平 2、优化城市资源的配置和利用 3、提升市民的生活质量和幸福感…

四节点/八节点四边形单元悬臂梁Matlab有限元编程 | 平面单元 | Matlab源码 | 理论文本

专栏导读 作者简介&#xff1a;工学博士&#xff0c;高级工程师&#xff0c;专注于工业软件算法研究本文已收录于专栏&#xff1a;《有限元编程从入门到精通》本专栏旨在提供 1.以案例的形式讲解各类有限元问题的程序实现&#xff0c;并提供所有案例完整源码&#xff1b;2.单元…

微服务获取登录用户Id与单体服务下获取用户Id对比(黑马头条Day03)

前置声明 当前前后端分离开发项目中&#xff0c;后端某个请求向具体某个数据库中的多个表插入数据时&#xff0c;经常需要使用到当前登录用户的Id&#xff08;唯一标识&#xff09;。在当前用户线程下以实现变量共享&#xff0c;同时为了避免不同用户线程之间操作变量的影响&am…

期货开户如何查询最新的手续费明细?

一、如何查询最新的手续费明细和保证金明细&#xff1f; 1、手机或者电脑交易软件下单窗口&#xff0c;点击品种合约&#xff0c;一般会显示1手需要的保证金比例&#xff0c;比如手机博弈大师/同花顺期货通等。 2、电脑交易软件下单窗口&#xff0c;点合约&#xff0c;里面会…

在idea中如何开启项目的热部署

热部署&#xff1a;就是当我们IDEA的项目在运行期间&#xff0c;我们修改代码以后&#xff0c;不需要我们自己重启项目&#xff0c;IDEA就会自动的重启项目 在idea中开启项目热部署的步骤 第一步&#xff1a;引入热部署的依赖 <dependency><groupId>org.springfr…

重磅!云智慧推出轻量智能化服务管理平台轻帆云

近日&#xff0c;云智慧推出智能服务管理平台轻帆云&#xff0c;通过构建服务体系、规范服务流程、保障服务质量、提升服务效能&#xff0c;为企业提供安全可靠的一站式服务管理解决方案。SaaS轻量化部署方式&#xff0c;仅需通过简单操作&#xff0c;即可轻松完成搭建&#xf…