【Flink网络数据传输】OperatorChain的设计与实现

文章目录

    • 1.OperatorChain的设计与实现
    • 2.OperatorChain的创建和初始化
    • 3.创建RecordWriterOutput

1.OperatorChain的设计与实现

OperatorChain的大致逻辑

在JobGraph对象的创建过程中,将链化可以连在一起的算子,常见的有StreamMap、StreamFilter等类型的算子。OperatorChain中的所有算子都会被运行在同一个Task实例中。StreamTaskNetworkOutput会将接入的数据元素写入算子链的HeadOperator中,从而开启整个OperatorChain的数据处理。

 
OperatorChain的Output组件:将数据发送到下游

如图所示,在OperatorChain中通过Output组件将上下游算子相连,当上游算子数据处理完毕后,会通过Output组件发送到下游的算子中继续处理。

 
OperatorChain的collect():收集处理完的数据

如图所示,OperatorChain内部定义了WatermarkGaugeExposingOutput接口,且该接口分别继承了Output和Collector接口。Collector接口提供了collect()方法,用于收集处理完的数据。

 
OperatorChain的Output接口:也能输出Watermark和LatencyMarker等事件

Output接口提供了emitWatermark()、emitLatencyMarker()等方法,用于对Collector接口进行拓展,使得Output接口实现类可以输出Watermark和LatencyMarker等事件。WatermarkGaugeExposingOutput接口则提供了获取WatermarkGauge的方法,用于监控最新的Watermark。

 
OperatorChain内部定义了不同的WatermarkGaugeExposingOutput接口实现类。

  1. RecordWriterOutput:用于输出OperatorChain中尾端算子处理完成的数据,借助RecordWriter组件将数据元素写入网络
  2. ChainingOutput/CopyingChainingOutput:适用于上下游算子连接在一起且上游算子属于单输出类型的情况。
  3. BroadcastingOutputCollector/CopyingBroadcastingOutputCollector:上游算子是多输出类型但上下游算子之间的Selector为空时,创建广播类型的BroadcastingOutputCollector。
  4. DirectedOutput/CopyingDirectedOutput:上游算子是多输出类型且Selector不为空时,创建DirectedOutput或CopyingDirectedOutput连接上下游算子

在这里插入图片描述

例子:收集数据并通过Output发数据数据到下游

例如在WordCount的程序中定义flatMap()方法时,会调用Collector.collect()方法收集数据元素,每个算子在定义的函数或使用Output接口的实现类中,完成了上游算子向下游算子发送数据元素的操作

 

2.OperatorChain的创建和初始化

接下来我们看OperatorChain的初始化过程,如下代码,OperatorChain的构造器包含如下逻辑。

  1. 创建StreamOperator(即算子)实例,这里StreamOperator会封装为StreamOperatorFactory并存储在StreamGraph结构中。
  2. 获取算子之间的链接配置。chainedConfigs的配置决定了算子之间Output接口的具体实现。
  3. 遍历当前作业所有节点的输出边,并构建RecordWriterOutput组件,最终通过RecordWriterOutput组件将数据元素输出到网络中。
  4. 创建OperatorChain内部算子之间的上下游连接,完成OperatorChain内部上下游算子之间的数据传输
  5. 单独创建headOperator。headOperator是OperatorChain的头部节点,创建完成后将headOperator暴露到StreamTask实例,供DataOutput接口实现类调用
  6. 如果OperatorChain构建失败,则关闭实例,防止出现内存泄漏。
public OperatorChain(StreamTask<OUT, OP> containingTask,RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate) {// 获取当前StreamTask的userCodeClassloaderfinal ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader();// 获取StreamConfigfinal StreamConfig configuration = containingTask.getConfiguration();// 获取StreamOperatorFactoryStreamOperatorFactory<OUT> operatorFactory = configuration.getStreamOperatorFactory(userCodeClassloader);// 读取chainedConfigsMap<Integer, StreamConfig> chainedConfigs = 
configuration.getTransitiveChainedTaskConfigsWithSelf(userCodeClassloader);// 根据StreamEdge创建RecordWriterOutput组件List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(userCodeClassloader);Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap = new HashMap<>(outEdgesInOrder.size());this.streamOutputs = new RecordWriterOutput<?>[outEdgesInOrder.size()];boolean success = false;try {for (int i = 0; i < outEdgesInOrder.size(); i++) {StreamEdge outEdge = outEdgesInOrder.get(i);// 为每个输出边创建RecordWriterOutputRecordWriterOutput<?> streamOutput = createStreamOutput(recordWriterDelegate.getRecordWriter(i),outEdge,chainedConfigs.get(outEdge.getSourceId()),containingTask.getEnvironment());this.streamOutputs[i] = streamOutput;streamOutputMap.put(outEdge, streamOutput);}// 创建OperatorChain内部算子之间的连接List<StreamOperator<?>> allOps = new ArrayList<>(chainedConfigs.size());this.chainEntryPoint = createOutputCollector(containingTask,configuration,chainedConfigs,userCodeClassloader,streamOutputMap,allOps,containingTask.getMailboxExecutorFactory());if (operatorFactory != null) {WatermarkGaugeExposingOutput<StreamRecord<OUT>> output = getChainEntryPoint();// 创建headOperatorheadOperator = StreamOperatorFactoryUtil.createOperator(operatorFactory,containingTask,configuration,output);headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK,output.getWatermarkGauge());} else {headOperator = null;}allOps.add(headOperator);this.allOperators = allOps.toArray(new StreamOperator<?>[allOps.size()]);success = true;}finally {// 如果创建不成功,则关闭StreamOutputs中的RecordWriterOutput// 这里防止内存泄漏if (!success) {for (RecordWriterOutput<?> output : this.streamOutputs) {if (output != null) {output.close();}}}}
}

OperatorChain作用小结

当OperatorChain创建完成后,就能正常接收StreamTaskInput中的数据元素了。在OperatorChain内部算子之间进行数据传递和处理,最终通过RecordWriterOutput组件将处理完成的数据元素发送到网络中,供下游的Task实例使用。

对于OperatorChain内部Output接口的实现,这里暂不展开。

 

3.创建RecordWriterOutput

RecordWriterOutput用于将数据输出到网络指定位置。

OperatorChain.createStreamOutput()逻辑如下:

  1. 获取输出边的OutputTag标签,判断当前Stream节点输出边是否为旁路输出,即在DataStream API中是否使用了旁路输出的相关方法。
  2. 返回RecordWriterOutput。RecordWriterOutput中包含RecordWriter组件,最终会通过RecordWriter将算子链处理完成的数据写入网络。
private RecordWriterOutput<OUT> createStreamOutput(RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,StreamEdge edge,StreamConfig upStreamConfig,Environment taskEnvironment) {// 获取OutputTagOutputTag sideOutputTag = edge.getOutputTag(); // 获取数据序列化器TypeSerializerTypeSerializer outSerializer = null;// 如果StreamEdge指定了OutputTagif (edge.getOutputTag() != null) {// 则进行边路输出outSerializer = upStreamConfig.getTypeSerializerSideOut(edge.getOutputTag(), taskEnvironment.getUserClassLoader());} else {// 正常输出outSerializer = upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());}// 返回创建的RecordWriterOutput实例return new RecordWriterOutput<>(recordWriter, outSerializer, sideOutputTag, this);
}

 

StreamRecord将数据输出的逻辑

在RecordWriterOutput.collect()方法中定义了StreamRecord数据的输出逻辑,实际上是调用pushToRecordWriter()方法将数据写入RecordWriter,最终通过RecordWriter组件进行数据元素的网络输出

public void collect(StreamRecord<OUT> record) {if (this.outputTag != null) {return;}pushToRecordWriter(record);
}

 

pushToRecordWriter发送数据

  1. 调用serializationDelegate.setInstance()方法,对接入的数据元素进行序列化操作,将数据元素转换成二进制格式。
  2. 调用recordWriter.emit()方法通过RecordWriter组件将serializationDelegate中序列化后的二进制数据输出到下游网络中。
//RecordWriterOutput.pushToRecordWriter()
private <X> void pushToRecordWriter(StreamRecord<X> record) {serializationDelegate.setInstance(record);try {recordWriter.emit(serializationDelegate);}catch (Exception e) {throw new RuntimeException(e.getMessage(), e);}
}

 
 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/520391.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

字符函数

1.字符分类函数 专门做字符分类的函数&#xff0c;都包含一个头文件#include <ctype.h> islower() 是一个用于判断字符是否为小写字母的函数。 通常情况下&#xff0c;如果一个字符是小写字母&#xff0c;则 islower() 函数会返回 true 或者一个表示真的值&#xff08…

【变量提升】关于JavaScript变量提升的理解,它导致了什么问题?

&#x1f601; 作者简介&#xff1a;一名大四的学生&#xff0c;致力学习前端开发技术 ⭐️个人主页&#xff1a;夜宵饽饽的主页 ❔ 系列专栏&#xff1a;JavaScript小贴士 &#x1f450;学习格言&#xff1a;成功不是终点&#xff0c;失败也并非末日&#xff0c;最重要的是继续…

大模型笔记:最少到最多提示过程 (Least to Most prompting, LtM)

LEAST-TO-MOST PROMPTING ENABLES COMPLEX REASONING IN LARGE LANGUAGE MODELS 2023 ICLR 1 概述 进一步发展维链提示过程 (CoT prompting) 分为两个阶段&#xff1a; 第一阶段&#xff1a;向语言模型提出查询&#xff0c;将问题分解成子问题。第二阶段&#xff1a;再次向语…

Z Potentials | 星爵,他的征途不止向量数据库

纵观过去几十年的科技发展史&#xff0c;每一代新的技术架构的出现往往都伴随着新的数据范式的出现&#xff0c;也催生了多家百亿到千亿美金数据平台的诞生。如果说 2023 年科技领域的关键词是 LLM&#xff0c;那么数据库领域的关键词一定非向量数据库莫属。向量数据库是一种专…

SpringMVC拦截器和过滤器执行顺序及区别

拦截器&#xff08;Inteceptor&#xff09;和过滤器&#xff08;Filter&#xff09;执行顺序&#xff1f; 拦截器和过滤器区别&#xff1f; 1、拦截次数不同&#xff1a; 过滤器&#xff1a;一次请求只能被一个过滤器拦截一次&#xff0c;它们按照在web.xml中的声明顺序依次执…

IJCAI23 - Continual Learning Tutorial

前言 如果你对这篇文章感兴趣&#xff0c;可以点击「【访客必读 - 指引页】一文囊括主页内所有高质量博客」&#xff0c;查看完整博客分类与对应链接。 本篇 Tutorial 主要介绍了 CL 中的一些基本概念以及一些过往的方法。 Problem Definition Continual Learning 和 Increm…

DataX及使用

DataX及使用 【一】DataX概述【二】DataX架构原理【1】设计理念【2】框架设计【3】运行流程【4】调度决策思路【5】DataX和Sqoop对比 【三】DataX部署【四】DataX上手【1】使用概述【2】配置文件格式【3】同步Mysql数据到HDFS 【五】DataX整合Springboot 【一】DataX概述 Data…

C++面试宝典【配文档,全方面学习】

原word文档[链接: https://pan.baidu.com/s/1CKnm7vHDmHSDskAgxgZgKA?pwdr4wv 提取码: r4wv 复制这段内容后打开百度网盘手机App&#xff0c;操作更方便哦 --来自百度网盘超级会员v5的分享] 一、C / C基础 1、简述C的内存分区&#xff1f; 一个C、C程序的内存分区主要有5个…

【李沐精读系列】GPT、GPT-2和GPT-3论文精读

论文&#xff1a; GPT&#xff1a;Improving Language Understanding by Generative Pre-Training GTP-2&#xff1a;Language Models are Unsupervised Multitask Learners GPT-3&#xff1a;Language Models are Few-Shot Learners 参考&#xff1a;GPT、GPT-2、GPT-3论文精读…

Spring Boot异常处理和单元测试

1.SpringBoot异常处理 1.1.自定义错误页面 SpringBoot默认的处理异常的机制&#xff1a;SpringBoot 默认的已经提供了一套处理异常的机制。一旦程序中出现了异常 SpringBoot 会向/error 的 url 发送请求。在 springBoot 中提供了一个叫 BasicErrorController 来处理/error 请…

LiveNVR监控流媒体Onvif/RTSP功能-支持云端录像监控视频集中存储录像回看录像计划配置NVR硬件设备录像回看

LiveNVR支持云端录像监控视频集中存储录像回看录像计划配置NVR硬件设备录像回看 1、流媒体服务软件2、录像回看3、查看录像3.1、时间轴视图3.2、列表视图 4、如何分享时间轴录像回看&#xff1f;5、iframe集成示例7、录像计划7、相关问题7.1、录像存储位置如何配置&#xff1f;…

阿里云和腾讯云区别价格表,云服务器费用对比2024年最新

2024年阿里云服务器和腾讯云服务器价格战已经打响&#xff0c;阿里云服务器优惠61元一年起&#xff0c;腾讯云服务器61元一年&#xff0c;2核2G3M、2核4G、4核8G、4核16G、8核16G、16核32G、16核64G等配置价格对比&#xff0c;阿腾云atengyun.com整理阿里云和腾讯云服务器详细配…