【Flink网络传输】ShuffleMaster与ShuffleEnvironment创建细节与提供的能力

文章目录

  • 一. Taskmanager之间传递数据细节
  • 二. ShuffleService的设计与实现
  • 三. 在JobMaster中创建ShuffleMaster
  • 四. 在TaskManager中创建ShuffleEnvironment
  • 五. 基于ShuffleEnvironment创建ResultPartition
    • 1. 在task启动时创建ResultPartition
    • 2. ResultPartition的创建与对数据的行为
    • 3. 创建ResultSubpartitions与 应用与流或批场景
  • 六. 基于ShuffleEnvironment创建InputGate
    • 1. 在哪里创建的InputGate
    • 2. SingleInputGate的创建和提供的能力
      • 2.1. 创建SingleInputGate
      • 2.2. InputChannel的创建与处理同一个tm的数据或跨tm的数据的能力

一. Taskmanager之间传递数据细节

Flink作业最终会被转换为ExecutionGraph并拆解成Task,在TaskManager中调度并执行,Task实例之间会发生跨TaskManager节点的数据交换,尤其是在DataStream API中使用了物理分区操作的情况。

ResultPartition组件存放中间结果等待下游节点消费:

从ExecutionGraph到物理执行图的转换中可以看出,ExecutionVertex最终会被转换为Task实例运行,在ExecutionGraph中上游节点产生的数据被称为IntermediateResult,物理执行图对应ResultPartition组件。在ResultPartition组件中会根据分区的数量再细分为ResultSubPartition。在ResultSubPartition中主要有BufferConsumer队列,用于本地存储Buffer数据,供下游的Task节点消费使用。

InputChannel读取上游数据

对下游的Task实例来讲,主要依赖InputGate组件读取上游数据,在InputGate组件中InputChannel和上游的ResultSubPartition数量相同(发送逻辑是?起到shuffle的作用)。
因此RecordWriter向ResultPartition中的ResultSubPartition写入Buffer数据,就是在向下游的InputChannel写入数据,因为最终会从ResultSubPartition的队列中读取Buffer数据再经过TCP网络连接发送到对应的InputChannel中。

在这里插入图片描述

ResultPartition(存储中间结果集)和InputGate(读取中间结果集)组件的创建

TaskManager接收到JobManager的Task创建请求时,会根据TaskDeploymentDescriptor中的参数创建并初始化ResultPartition和InputGate组件。Task启动成功并开始接入数据后,使用ResultPartition和InputGate组件实现上下游算子之间的跨网络数据传输

ShuffleMaster管理ResultPartition和InputGate。

在TaskManager实例中,主要通过ShuffleEnvironment统一创建ResultPartition和InputGate组件。在JobMaster中也会创建ShuffleMaster统一管理和监控作业中所有的ResultPartition和InputGate组件

 

因此在介绍ResultPartition和InputGate之前,我们先了解一下ShuffleMaster和ShuffleEnvironment的主要作用和创建过程。

 

二. ShuffleService的设计与实现

如图,创建ShuffleMaster和ShuffleEnvironment组件主要依赖ShuffleServiceFactory实现。同时为了实现可插拔的ShuffleService服务,ShuffleServiceFactory的实现类通过Java SPI的方式加载到ClassLoader中,即通过ShuffleServiceLoader从配置文件中加载系统配置的ShuffleServiceFactory实现类,因此用户也可以自定义实现Shuffle服务。

基于SPI的方式加载ShuffleServiceFactory

在JobManager内部创建JobManagerRunner实例的过程中会创建ShuffeServiceLoader,用于通过Java SPI服务的方式加载配置的ShuffleServiceFactory,同时在TaskManager的TaskManagerServices中创建ShuffeServiceLoader并加载ShuffleServiceFactory。

ShuffleServiceFactory提供了创建ShuffleMaster和ShuffleEnvironment的能力
ShuffleServiceFactory接口定义中包含创建ShuffleMaster和ShuffleEnvironment的方法。Flink提供了基于Netty通信框架实现的NettyShuffleServiceFactory,作为ShuffleServiceFactory接口的默认实现类。

ShuffleEnvironment组件提供了创建Task实例中ResultPartition和InputGate组件的方法,同时Flink中默认提供了NettyShuffleEnvironment实现。

ShuffleMaster组件实现了对ResultPartition和InputGate的注册功能

ShuffleMaster组件实现了对ResultPartition和InputGate的注册功能,同时每个作业都有ShuffleMaster管理当前作业的ResultPartition和InputGate等信息,Flink中提供了NettyShuffleMaster默认实现。

ShuffleService UML关系图

在这里插入图片描述

 

三. 在JobMaster中创建ShuffleMaster

创建ShuffleMaster,ShuffleEnvironment的大致过程

  • 通过ShuffleServiceFactory可以创建ShuffleMaster和ShuffleEnvironment服务,其中ShuffleMaster主要用在JobMaster调度和执行Execution时,维护当前作业中的ResultPartition信息,例如ResourceID、ExecutionAttemptID等
  • 紧接着JobManager会将ShuffleMaster创建的NettyShuffleDescriptor参数信息发送给对应的TaskExecutor实例,在TaskExecutor中就会基于NettyShuffleDescriptor的信息,通过ShuffleEnvironment组件创建ResultPartition、InputGate等组件。

分配slot资源,并将分区信息注册到ShuffleMaster中

如代码清单,在JobMaster开始向Execution分配Slot资源时,会通过分配的Slot计算资源获取TaskManagerLocation信息,然后调用Execution.registerProducedPartitions()方法将分区信息注册到ShuffleMaster中。

CompletableFuture<Execution> allocateResourcesForExecution(SlotProviderStrategy slotProviderStrategy,LocationPreferenceConstraint locationPreferenceConstraint,@Nonnull Set<AllocationID> allPreviousExecutionGraphAllocationIds) {return allocateAndAssignSlotForExecution(slotProviderStrategy,locationPreferenceConstraint,allPreviousExecutionGraphAllocationIds).thenCompose(slot -> registerProducedPartitions(slot.getTaskManagerLocation()));
}

Execution.registerProducedPartitions()方法逻辑如下。

  1. 创建ProducerDescriptor对象,其中包含了分区生产者的基本信息,例如网络连接地址和端口以及TaskManagerLocation信息
  2. 获取当前ExecutionVertex节点对应的IntermediateResultPartition信息,在IntermediateResultPartition结构中包含了ExecutionVertex、IntermediateResultPartitionID以及ExecutionEdge等逻辑分区信息。
  3. 遍历IntermediateResultPartition列表,将IntermediateResultPartition转换为PartitionDescriptor数据结构,然后调用ExecutionGraph的ShuffleMaster服务,将创建的PartitionDescriptor和ProducerDescriptor注册到ShuffleMaster服务中
  4. 根据ShuffleDescriptor创建ResultPartitionDeploymentDescriptor并添加到partitionRegistrations集合中。(producedPartitions信息会被TaskManager的ShuffleEnvironment用于创建ResultPartition和InputGate等组件。
static CompletableFuture<Map<IntermediateResultPartitionID, ResultPartitionDeploymentDescriptor>> registerProducedPartitions(ExecutionVertex vertex,TaskManagerLocation location,ExecutionAttemptID attemptId,boolean sendScheduleOrUpdateConsumersMessage) {// 创建ProducerDescriptorProducerDescriptor producerDescriptor = ProducerDescriptor.create(location, attemptId);// 获取当前节点的partition信息Collection<IntermediateResultPartition> partitions = vertex.getProducedPartitions().values();Collection<CompletableFuture<ResultPartitionDeploymentDescriptor>> partitionRegistrations =new ArrayList<>(partitions.size());// 向ShuffleMaster注册partition信息for (IntermediateResultPartition partition : partitions) {PartitionDescriptor partitionDescriptor = PartitionDescriptor.from(partition);int maxParallelism = getPartitionMaxParallelism(partition);// 调用ShuffleMaster注册partitionDescriptor和producerDescriptorCompletableFuture<? extends ShuffleDescriptor> shuffleDescriptorFuture = vertex.getExecutionGraph().getShuffleMaster().registerPartitionWithProducer(partitionDescriptor, producerDescriptor);Preconditions.checkState(shuffleDescriptorFuture.isDone(), "ShuffleDescriptor future is incomplete.");// 创建ResultPartitionDeploymentDescriptor实例CompletableFuture<ResultPartitionDeploymentDescriptor> partitionRegistration = shuffleDescriptorFuture.thenApply(shuffleDescriptor -> new ResultPartitionDeploymentDescriptor(partitionDescriptor,shuffleDescriptor,maxParallelism,sendScheduleOrUpdateConsumersMessage));// 添加到partitionRegistrations集合中partitionRegistrations.add(partitionRegistration);}// 转换存储结构return FutureUtils.combineAll(partitionRegistrations).thenApply(rpdds -> {Map<IntermediateResultPartitionID, ResultPartitionDeploymentDescriptor> producedPartitions =new LinkedHashMap<>(partitions.size());rpdds.forEach(rpdd -> producedPartitions.put(rpdd.getPartitionId(), rpdd));return producedPartitions;});
}

 

四. 在TaskManager中创建ShuffleEnvironment

从fromConfiguration创建并启动shuffleEnvironment

在TaskManagerServices的启动过程中会创建并启动ShuffleEnvironment。如代码,在TaskManagerServices.fromConfiguration()方法中包含创建和启动ShuffleEnvironment的过程。和ShuffleMaster的创建过程一样,在TaskManagerServices.createShuffleEnvironment()方法中,也会通过Java SPI的方式加载ShuffleServiceFactory实现类,然后创建ShuffleEnvironment。

public static TaskManagerServices fromConfiguration(...)  throws Exception {。。。// 调用createShuffleEnvironment创建ShuffleEnvironment
final ShuffleEnvironment<?, ?> shuffleEnvironment = createShuffleEnvironment(taskManagerServicesConfiguration,taskEventDispatcher,taskManagerMetricGroup);
// 启动shuffleEnvironment
final int dataPort = shuffleEnvironment.start();
...
}

 

NettyShuffleEnvironment的创建过程,以及它提供的能力:

在Flink中默认提供基于Netty通信框架实现的NettyShuffleServiceFactory实现类,创建NettyShuffleEnvironment。
ShuffleEnvironment控制了TaskManager中网络数据交换需要的全部服务和组件信息,包括创建上下游数据传输的ResultPartition、SingleInput以及用于网络栈中Buffer数据缓存的NetworkBufferPool等

这里了解NettyShuffleEnvironment的创建过程:

  1. 从NettyShuffleEnvironmentConfiguration参数中获取Netty相关配置,例如TransportType、InetAddress、serverPort以及numberOfSlots等信息。
  2. 创建ResultPartitionManager实例,注册和管理TaskManager中的ResultPartition信息,并提供创建ResultSubpartitionView的方法,专门用于消费ResultSubpartition中的Buffer数据
  3. 创建FileChannelManager实例,指定配置中的临时文件夹,然后创建并获取文件的FileChannel。对于离线类型的作业,会将数据写入文件系统,再对文件进行处理,这里的实现和MapReduce算法类似(ing)。
  4. 创建ConnectionManager实例,主要用于InputChannel组件。
    InputChannel会通过ConnectionManager创建PartitionRequestClient,实现和ResultPartition之间的网络连接。ConnectionManager会根据NettyConfig是否为空,选择创建NettyConnectionManager还是LocalConnectionManager。
  5. 创建NetworkBufferPool组件,用于向ResultPartition和InputGate组件提供Buffer内存存储空间,实际上就是分配和管理MemorySegment内存块
  6. 向系统中注册ShuffleMetrics,用于跟踪Shuffle过程的监控信息
  7. 创建ResultPartitionFactory工厂类,用于创建ResultPartition。
  8. 创建SingleInputGateFactory工厂类,用于创建SingleInputGate。

将以上创建的组件或服务作为参数来创建NettyShuffleEnvironment。

NettyShuffleServiceFactory.createNettyShuffleEnvironment()
static NettyShuffleEnvironment createNettyShuffleEnvironment(NettyShuffleEnvironmentConfiguration config,ResourceID taskExecutorResourceId,TaskEventPublisher taskEventPublisher,MetricGroup metricGroup) {// 检查参数都不能为空
。。。// 获取Netty相关的配置参数NettyConfig nettyConfig = config.nettyConfig();// 创建ResultPartitionManager实例ResultPartitionManager resultPartitionManager = new ResultPartitionManager();// 创建FileChannelManager实例FileChannelManager fileChannelManager = new FileChannelManagerImpl(config.getTempDirs(), DIR_NAME_PREFIX);// 创建ConnectionManager实例ConnectionManager connectionManager = nettyConfig != null ?new NettyConnectionManager(resultPartitionManager, taskEventPublisher, nettyConfig): new LocalConnectionManager();// 创建NetworkBufferPool实例NetworkBufferPool networkBufferPool = new NetworkBufferPool(config.numNetworkBuffers(),config.networkBufferSize(),config.networkBuffersPerChannel(),config.getRequestSegmentsTimeout());// 注册ShuffleMetrics信息registerShuffleMetrics(metricGroup, networkBufferPool);// 创建ResultPartitionFactory实例ResultPartitionFactory resultPartitionFactory = new ResultPartitionFactory(resultPartitionManager,fileChannelManager,networkBufferPool,config.getBlockingSubpartitionType(),config.networkBuffersPerChannel(),config.floatingNetworkBuffersPerGate(),config.networkBufferSize(),config.isForcePartitionReleaseOnConsumption(),config.isBlockingShuffleCompressionEnabled(),config.getCompressionCodec());// 创建SingleInputGateFactory实例SingleInputGateFactory singleInputGateFactory = new SingleInputGateFactory(taskExecutorResourceId,config,connectionManager,resultPartitionManager,taskEventPublisher,networkBufferPool);// 最后返回NettyShuffleEnvironmentreturn new NettyShuffleEnvironment(taskExecutorResourceId,config,networkBufferPool,connectionManager,resultPartitionManager,fileChannelManager,resultPartitionFactory,singleInputGateFactory);
}

至此,创建NettyShuffleEnvironment的过程就基本完成了,接下来TaskManager会接受JobMaster提交的Task申请(这是一个被动过程?为了开口子接收其他task的数据?),然后通过ShuffleEnvironment为Task实例创建ResultPartition和InputGate组件。创建这些组件的信息来自ShuffleMaster中注册的ResultPartition和ExecutionEdge等信息。

 
接下来我们具体了解如何通过ShuffleEnvironment创建ResultPartition和InputGate两个重要组件。

 

五. 基于ShuffleEnvironment创建ResultPartition

1. 在task启动时创建ResultPartition

task启动时就创建ResultPartition

当TaskManager接收到JobMaster提交的Task作业申请后,就会创建并启动Task线程。
如代码所示,Task的构造器方法包含了NettyShuffleEnvironment创建ResultPartitionWriter的实现,可以理解为在创建Task线程的时候就通过ShuffleEnvironment创建了ResultPartition

反压控制:动态控制数据向下游输出

创建好ResultPartitionWriter后,对ResultPartitionWriter进行装饰,目的是让ResultPartition可以向下游节点发送ResultPartition是否可消费的信息,以便实现动态控制ResultPartitionWriter内的数据输出

org.apache.flink.runtime.taskmanager.Task
public Task(...){
final ShuffleIOOwnerContext taskShuffleContext = shuffleEnvironment.createShuffleIOOwnerContext(taskNameWithSubtaskAndId, executionId, metrics.getIOMetricGroup());
// 创建ResultPartitonWriter
final ResultPartitionWriter[] resultPartitionWriters = shuffleEnvironment.createResultPartitionWriters(taskShuffleContext,resultPartitionDeploymentDescriptors).toArray(new ResultPartitionWriter[] {});
// 对ResultPartiton进行装饰
this.consumableNotifyingPartitionWriters = ConsumableNotifyingResultPartitionWriterDecorator.decorate(resultPartitionDeploymentDescriptors,resultPartitionWriters,this,jobId,resultPartitionConsumableNotifier);}

2. ResultPartition的创建与对数据的行为

如代码,接着看创建ResultPartition的主要逻辑。

  1. 根据resultPartitionDeploymentDescriptors的大小初始化ResultPartition数组。
  2. 通过resultPartitionFactory创建ResultPartition。
  3. 调用registerOutputMetrics()方法注册resultPartitions相关的监控指标信息。
  4. 返回创建的ResultPartition数组。
NettyShuffleEnvironment.createResultPartitionWriters()
public Collection<ResultPartition> createResultPartitionWriters(ShuffleIOOwnerContext ownerContext,Collection<ResultPartitionDeploymentDescriptor> resultPartitionDeploymentDescriptors) {synchronized (lock) {Preconditions.checkState(!isClosed, "The NettyShuffleEnvironment has already been shut down.");// 根据resultPartitionDeploymentDescriptors创建ResultPartition数组ResultPartition[] resultPartitions = new ResultPartition[resultPartitionDeploymentDescriptors.size()];int counter = 0;// 遍历ResultPartitionDeploymentDescriptor创建ResultPartitionfor (ResultPartitionDeploymentDescriptor rpdd : resultPartitionDeploymentDescriptors) {resultPartitions[counter++] = resultPartitionFactory.create(ownerContext.getOwnerName(), rpdd);}registerOutputMetrics(config.isNetworkDetailedMetrics(), ownerContext.getOutputGroup(), resultPartitions);return  Arrays.asList(resultPartitions);}
}

 

继续了解ResultPartition的创建过程

  1. 判断ResultPartitionType是否为Blocking类型,如果是则需要创建BufferCompressor,用于压缩Buffer数据,即在离线数据处理过程中通过BufferCompressor压缩Buffer数据。
  2. 根据numberOfSubpartitions对应的数量创建ResultSubpartition数组,并存储当前ResultPartition中的ResultSubpartition。
  3. 根据ResultPartitionType参数创建ResultPartition,如果ResultPartitionType是Blocking类型,则创建ReleaseOnConsumptionResultPartition,即数据消费完便立即释放ResultPartition。否则创建ResultSubpartition,即不会随着数据消费完之后进行释放,适用于流数据处理场景
  4. 调用createSubpartitions()方法创建ResultSubpartition。ResultSubpartition会有ID进行区分,并和InputGate中的InputChannel一一对应
//ResultPartitionFactory.create()
public ResultPartition create(String taskNameWithSubtaskAndId,ResultPartitionID id,ResultPartitionType type,int numberOfSubpartitions,int maxParallelism,FunctionWithException<BufferPoolOwner, BufferPool, IOException> bufferPoolFactory)
{BufferCompressor bufferCompressor = null;// 如果ResultPartitionType是Blocking类型,则需要创建BufferCompressor,用于数据压缩if (type.isBlocking() && blockingShuffleCompressionEnabled) {bufferCompressor = new BufferCompressor(networkBufferSize, compressionCodec);}// 创建ResultSubpartition数组ResultSubpartition[] subpartitions = new ResultSubpartition[numberOfSubpartitions];// 根据条件创建ResultPartitionResultPartition partition = forcePartitionReleaseOnConsumption || !type.isBlocking()? new ReleaseOnConsumptionResultPartition(taskNameWithSubtaskAndId,id,type,subpartitions,maxParallelism,partitionManager,bufferCompressor,bufferPoolFactory): new ResultPartition(taskNameWithSubtaskAndId,id,type,subpartitions,maxParallelism,partitionManager,bufferCompressor,bufferPoolFactory);// 创建SubpartitionscreateSubpartitions(partition, type, blockingSubpartitionType, subpartitions);LOG.debug("{}: Initialized {}", taskNameWithSubtaskAndId, this);return partition;
}

 

3. 创建ResultSubpartitions与 应用与流或批场景

  • 在创建ResultSubpartitions的时候,也会根据ResultPartitionType是否为Blocking类型,选择创建BoundedBlockingPartitions(用于有界批计算处理场景)或PipelinedSubpartition(用于无界流式数据集处理场景)。
  • 在PipelinedSubpartition中会以subpartitions的数组索引作为ResultPartition中的index,也就是说,ResultPartition主要通过index确认数据写入哪个ResultSubPartition。
private void createSubpartitions(ResultPartition partition,ResultPartitionType type,BoundedBlockingSubpartitionType blockingSubpartitionType,ResultSubpartition[] subpartitions) {// 创建ResultSubpartitions.if (type.isBlocking()) {initializeBoundedBlockingPartitions(subpartitions,partition,blockingSubpartitionType,networkBufferSize,channelManager);} else {for (int i = 0; i < subpartitions.length; i++) {subpartitions[i] = new PipelinedSubpartition(i, partition);}}
}

 

六. 基于ShuffleEnvironment创建InputGate

1. 在哪里创建的InputGate

和ResultPartition创建过程相似,Task的初始化过程中也会创建InputGate。如代码,Task构造器方法中涵盖了InputGate的创建逻辑。

final InputGate[] gates = shuffleEnvironment.createInputGates(taskShuffleContext,this,inputGateDeploymentDescriptors).toArray(new InputGate[] {});
this.inputGates = new InputGate[gates.length];
int counter = 0;
for (InputGate gate : gates) {inputGates[counter++] = new InputGateWithMetrics(gate, metrics.getIOMetricGroup().getNumBytesInCounter());
}

接下来具体看NettyShuffleEnvironment.createInputGates()的逻辑

  1. 获取networkInputGroup信息,用于创建InputChannelMetrics。
  2. 根据inputGateDeploymentDescriptorsShufflemanager传递的,那这个数量是怎么确定的?ing)数组的大小创建SingleInputGate数组,用于存储SingleInputGate组件。
  3. 根据InputGateDeploymentDescriptor创建SingleInputGate
  4. 注册InputGate的监控信息,并返回SingleInputGate集合。
public Collection<SingleInputGate> createInputGates(ShuffleIOOwnerContext ownerContext,PartitionProducerStateProvider partitionProducerStateProvider,Collection<InputGateDeploymentDescriptor> inputGateDeploymentDescriptors) {synchronized (lock) {Preconditions.checkState(!isClosed, "The NettyShuffleEnvironment has already been shut down.");MetricGroup networkInputGroup = ownerContext.getInputGroup();@SuppressWarnings("deprecation")InputChannelMetrics inputChannelMetrics = new InputChannelMetrics(networkInputGroup, ownerContext.getParentGroup());SingleInputGate[] inputGates = new SingleInputGate[inputGateDeploymentDescriptors.size()];int counter = 0;//遍历igdd通过singleInputGateFactory创建inputGatefor (InputGateDeploymentDescriptor igdd : inputGateDeploymentDescriptors) {SingleInputGate inputGate = singleInputGateFactory.create(ownerContext.getOwnerName(),igdd,partitionProducerStateProvider,inputChannelMetrics);InputGateID id = new InputGateID(igdd.getConsumedResultId(), ownerContext.getExecutionAttemptID());inputGatesById.put(id, inputGate);inputGate.getCloseFuture().thenRun(() -> inputGatesById.remove(id));inputGates[counter++] = inputGate;}//注册metricregisterInputMetrics(config.isNetworkDetailedMetrics(), networkInputGroup,inputGates);return Arrays.asList(inputGates);}
}

 

2. SingleInputGate的创建和提供的能力

2.1. 创建SingleInputGate

继续看SingleInputGateFactory创建SingleInputGate的过程,如代码

  1. 创建createBufferPoolFactory,用于创建LocalBufferPool。通过LocalBufferPool可以为InputGate提供Buffer数据的存储空间,实现本地缓冲接入InputGate中的二进制数据。
  2. 根据结果分区类型和是否支持压缩决定是否创建BufferDecompressor,这里和ResultPartition中的BufferCompressor是对应的,即通过BufferDecompressor解压经过BufferCompressor压缩后的Buffer数据。
  3. 通过InputGateDeploymentDescriptor中的参数BufferCompressor和BufferPoolFactory创建SingleInputGate对象。
  4. 调用createInputChannels()方法创建SingleInputGate中的InputChannels。
  5. 将创建完成的inputGate返回给Task实例。
public SingleInputGate create(@Nonnull String owningTaskName,@Nonnull InputGateDeploymentDescriptor igdd,@Nonnull PartitionProducerStateProvider partitionProducerStateProvider,@Nonnull InputChannelMetrics metrics) {SupplierWithException<BufferPool, IOException> bufferPoolFactory = createBufferPoolFactory(networkBufferPool,networkBuffersPerChannel,floatingNetworkBuffersPerGate,igdd.getShuffleDescriptors().length,igdd.getConsumedPartitionType());BufferDecompressor bufferDecompressor = null;if (igdd.getConsumedPartitionType().isBlocking() && blockingShuffleCompressionEnabled) {bufferDecompressor = new BufferDecompressor(networkBufferSize, compressionCodec);}SingleInputGate inputGate = new SingleInputGate(owningTaskName,igdd.getConsumedResultId(),igdd.getConsumedPartitionType(),igdd.getConsumedSubpartitionIndex(),igdd.getShuffleDescriptors().length,partitionProducerStateProvider,bufferPoolFactory,bufferDecompressor);//创建SingleInputGate中的InputChannels。createInputChannels(owningTaskName, igdd, inputGate, metrics);return inputGate;
}

SingleInputGateFactory.createInputChannels()方法定义了创建指定SingleInputGate对应的InputChannel集合。

  1. 获取ShuffleDescriptor列表,ShuffleDescriptor是在ShuffleMaster中创建和生成的,描述了数据生产者和ResultPartition等信息。
  2. 创建InputChannel数组,最后将其存储到inputGate中。可以看出每个resultPartitionID对应一个InputChannel。
private void createInputChannels(String owningTaskName,InputGateDeploymentDescriptor inputGateDeploymentDescriptor,SingleInputGate inputGate,InputChannelMetrics metrics) {ShuffleDescriptor[] shuffleDescriptors = inputGateDeploymentDescriptor.getShuffleDescriptors();// 创建InputChannelInputChannel[] inputChannels = new InputChannel[shuffleDescriptors.length];ChannelStatistics channelStatistics = new ChannelStatistics();for (int i = 0; i < inputChannels.length; i++) {inputChannels[i] = createInputChannel(inputGate,i,shuffleDescriptors[i],channelStatistics,metrics);ResultPartitionID resultPartitionID = inputChannels[i].getPartitionId();inputGate.setInputChannel(resultPartitionID.getPartitionId(), inputChannels[i]);}LOG.debug("{}: Created {} input channels ({}).",owningTaskName,inputChannels.length,channelStatistics);
}

2.2. InputChannel的创建与处理同一个tm的数据或跨tm的数据的能力

概述

在SingleInputGateFactory.createInputChannel()方法中定义了创建InputChannel的具体逻辑,
同时会根据ShuffleDescriptor实现类是否为NettyShuffleDescriptor决定创建UnknownInputChannel还是系统内置的LocalInputChannel和RemoteInputChannel。

重点了解LocalInputChannel和RemoteInputChannel的创建过程。

创建内置InputChannel的主要逻辑:

[!NOTE]
判断消费数据的Task实例和数据生产的Task实例是否运行在同一个TaskManager中。这一步主要是在判断producerLocation和consumerLocation是否相等,

  • 如果相等则说明上下游Task属于同一TaskManager,创建的InputChannel就为LocalInputChannel,下游InputChannel不经过网络获取数据
  • 不相等,则说明上下游Task不在同一个TaskManager中,此时创建基于Netty框架实现的RemoteInputChannel,帮助下游Task实例从网络中消费上游Task中的Buffer数据。

在RemoteInputChannel中需要networkBufferPool、connectionManager等组件,对于LocalInputChannel则不需要这些组件。在ShuffleMaster注册分区信息的时候(when:在申请好tm资源后?),创建上下游Task的连接信息,此时会根据Task分配的Slot信息,传入ProducerLocation和ConsumerLocation等配置信息,然后创建不同的InputChannel,从而实现上下游Task的网络连接。

private InputChannel createKnownInputChannel(SingleInputGate inputGate,int index,NettyShuffleDescriptor inputChannelDescriptor,ChannelStatistics channelStatistics,InputChannelMetrics metrics) {ResultPartitionID partitionId = inputChannelDescriptor.getResultPartitionID();if (inputChannelDescriptor.isLocalTo(taskExecutorResourceId)) {// Task实例属于同一个TaskManagerchannelStatistics.numLocalChannels++;return new LocalInputChannel(inputGate,index,partitionId,partitionManager,taskEventPublisher,partitionRequestInitialBackoff,partitionRequestMaxBackoff,metrics);} else {// Task实例属于不同的TaskManagerchannelStatistics.numRemoteChannels++;return new RemoteInputChannel(inputGate,index,partitionId,inputChannelDescriptor.getConnectionId(),connectionManager,partitionRequestInitialBackoff,partitionRequestMaxBackoff,metrics,networkBufferPool);}
}

 

到这里,ResultPartition和InputGate组件就全部创建完毕了。Task实例会将ResultPartition和InputGate组件封装在环境信息中,然后传递给StreamTask。StreamTask获取ResultPartition和InputGate,用于创建StreamNetWorkTaskInput和RecordWriter组件,从而完成Task中数据的输入和输出。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/518423.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++写食堂菜品管理系统

说明:本博文来自CSDN-问答板块,题主提问。 需要:学校拟开发一套食堂菜品管理系统,以便对菜品和同学们的评价进行管理,其中包含如下信息: 商户:商户名称、柜面位置、电话…… 菜品:菜品编号、菜品名称、价格、所属商户…… 学生:注册账号、昵称、电话…… 食堂里的商户…

arguments和剩余参数(...)

1、arguments对象 是函数内部内置的对象&#xff0c;是一个伪数组&#xff0c;包含了调用函数是传入的所有实参。可用来动态获取函数的实参。 function init(a,b,c) {console.log(arguments)}init(1,2,3) 2、剩余函数(...) 获取多余的实参&#xff0c;并形成一个真数组&#xf…

不精确一维搜索:Armijo-Goldstein准则Wolfe-Powell准则

一维搜索/线搜索 1.引言2.内容3.准则思想 1.引言 为了防止迭代过程中函数值 f ( x k ) f(x^k) f(xk) 的下降量不够充分&#xff0c;以至于算法无法收敛到极小值点&#xff0c;必须引入一些更合理的线搜索准则来确保迭代的收敛性。保证每一步迭代充分下降。 2.内容 line sea…

PMP证书:究竟值不值得考?含金量如何?

PMP证书在项目管理领域还是很受关注&#xff0c;但其含金量和是否必须考取一直存在争议。在这里&#xff0c;我们来深入分析&#xff0c;看看PMP证书到底值不值得考&#xff0c;以及背后的原因。 首先&#xff0c;我们要关注的是PMP考试的通过率。根据网络数据&#xff0c;PMP…

Tomcat性能调优

1‍.应用场景/常见内容溢出问题‍ 常见问题为内存溢出&#xff0c;分为堆内存溢出、非堆内存溢出&#xff0c;比较常见的为堆内存溢出&#xff0c;后2类属于非堆内存溢出。 堆溢出&#xff1a; java.lang.OutOfMemoryError:Java heap spcace 原因:项目运行阶段,new的对象过多…

SAP MM学习笔记 - 错误 BMG140 - The material number is longer than the length set

错误 BMG140 - The material number is longer than the length set 品目编号大于长度设置 1&#xff0c;在新规品目的时候&#xff0c;出的错 2&#xff0c;OMSL 品目Code书式变更 IMG path>Logistic general>Material Master>Basic settings>Define output for…

第十五届蓝桥杯青少组STEMA测评SPIKE初级真题试卷 2024年1月

第十五届蓝桥杯青少组STEMA测评SPIKE初级真题试卷 2024年1月 ​​​​​​​ 来自&#xff1a;6547网 http://www.6547.cn/doc/vywur8eics

Windows 内核和 Linux 内核谁更复杂?

Windows 内核和 Linux 内核谁更复杂? 在开始前我有一些资料&#xff0c;是我根据网友给的问题精心整理了一份「Linux的资料从专业入门到高级教程」&#xff0c; 点个关注在评论区回复“888”之后私信回复“888”&#xff0c;全部无偿共享给大家&#xff01;&#xff01;&…

【SQL】1068. 产品销售分析 I

题目描述 leetcode题目&#xff1a;1068. 产品销售分析 I 写法 select Product.product_name, Sales.year, Sales.price from Sales left join Product on Sales.product_id Product.product_id记录细节&#xff1a;加上表名检索效率更高。 -- ERROR: 时间超出限制 > 加…

社科院与杜兰大学金融管理硕士——金融人需要跟风在职读研吗

近年来&#xff0c;国家通过实施一系列政策&#xff0c;鼓励和扶持着职场人士继续深造&#xff0c;不仅提升学历层次&#xff0c;更在综合素质上追求卓越。这些政策的落地生根&#xff0c;为在职读研铺设了宽广的道路&#xff0c;使得越来越多的职场人士心潮澎湃&#xff0c;纷…

Processing基本形状内容和实例

一、Processing的基本形状内容和实例 1.Processing有一组专门绘制基本图形得图案。像线条这样的基本图形可以被连接起来创建更为复杂得形状&#xff0c;例如一片叶子或者一张脸。 2.为了绘制一条直线&#xff0c;我们需要四个参数&#xff0c;两个用于确定初始位置&#xff0c;…

【JavaEE初阶 -- 计算机核心工作机制】

这里写目录标题 1.冯诺依曼体系2.CPU是怎么构成的3.指令表4.CPU执行代码的方式5.CPU小结&#xff1a;6.编程语言和操作系统7. 进程/任务&#xff08;Process/Task&#xff09;8.进程在系统中是如何管理的9. CPU分配 -- 进程调度10.内存分配 -- 内存管理11.进程间通信 1.冯诺依曼…