深入理解 Flink(三)Flink 内核基础设施源码级原理详解

Hadoop 生态各大常见组件的 RPC 技术实现

在这里插入图片描述

Flink RPC 网络通信框架 Akka 详解

在这里插入图片描述
1、ActorSystem 是管理 Actor 生命周期的组件,Actor 是负责进行通信的组件。
2、每个 Actor 都有一个 MailBox,别的 Actor 发送给它的消息都首先储存在 MailBox 中,通过这种方式可以实现异步通信。
3、每个 Actor 是单线程的处理方式,不断的从 MailBox 拉取消息执行处理,所以对于 Actor 的消息处理,不适合调用阻塞方法。
4、Actor 可以改变他自身的状态,可以接收消息,也可以发送消息,还可以生成新的 Actor(谁生的谁养)。
5、每一个 ActorSystem 和 Actor 都在启动的时候会给定一个 name,如果要从 ActorSystem 中,获取一个 Actor,则通过以下的方式来进行 Actor 的获取:

akka.tcp://actorsystem_name@bigdata02:9527/user/actor_name 

6、如果一个 Actor 要和另外一个 Actor 进行通信,则必须先获取对方 Actor 的 ActorRef 对象,然后通过该对象发送消息即可。

actorRef = actorSystem.actorOf("akka.tcp://actorsystem_name@bigdata02:9527/user/actor_name")
// 获取和对方 actor 进行通信的一个 actorRef 对象,类似于一个本地调用,但事实上,actorRef 和 对方actor 的通信细节被封装了。
actorRef = actorSystem.actorOf("schema://actorsystem_name@hostname:port/user/actor_name")
actorRef.getNow()

7、通过 tell 发送异步消息,不接收响应,通过 ask 发送异步消息,得到 Future 返回,通过异步回调返回处理结果。

深入理解 Flink RPC 网络通信框架

Flink RPC 采用了和 Akka 一样的一种抽象,底层是基于 Akka 来实现。Flink RPC 其实是封装了 Akka 但是上层抽象其实和 Akka 的工作机制是一样的。Flink 的 RPC 网络通信框架的底层依然使用 Akka Actor Model 模型设计实现,大致实现和 Spark RPC 差不多。
Flink 中的 RPC 实现主要在 flink-runtime 模块下的 org.apache.flink.runtime.rpc 包中,涉及到的最重要的 API 主要是以下这四个:
在这里插入图片描述

简单概况

1、RpcGateway 路由,RPC 的老祖宗。各种其他 RPC 服务组件,都是 RpcGateWay 的子类,类似于 Hadoop 中的通信协议 Protocol。
2、RpcEndpoint 业务逻辑载体,对应的 Actor 的封装。
3、RpcService 对应 ActorSystem 的封装,类似于 Spark 中的 RpcEnv。
4、RpcServer 为 RpcService(ActorSystem)和 RpcEndpoint(Actor)之间的粘合层

继承关系图

在这里插入图片描述
四个比较重要的子类:

  • TaskExecutor 集群中从节点中最重要的角色,负责资源管理。
  • Dispatcher 主节点中的一个工作角色,负责 job 调度执行。
  • JobMaster 应用程序中的主控程序,类似于 Spark 中的 Driver 的作用,或者 MapReduce 中的 MRAppMaster。
  • ResourceManager 集群中的主节点 JobManager 中的负责资源管理的角色,和 TaskExecutor 一起构成资源管理的主从架构。

这四个组件的任何一个组件的实例对象创建成功之后,都会要调用 start() 去启动这个 RpcEndpoint,然后就会去执行他的 RpcEndpoint 的 onStart() 方法。一般来说,对应的 RpcEndpoint 组件都会重写,在这些 RpcEndpoint 组件启动的时候,一些重要的逻辑,都有可能被放在这个 onStart() 生命周期方法里。

关于 Flink Standalone 集群
逻辑概念:JobManager + TaskManager
物理概念:ClusterEntryPoint(ResourceManager) + TaskManagerRunner(TaskExecutor)
主节点 ClusterEntryPoint 的内部其实有四种重要的组件:

  • ResourceManager
  • Dispatcher
  • RestServer
  • JobMaster

例如,在 TaskExecutor 的内部,持有 ResourceManager 的一个 Gateway 对象,当 TaskExecutor 需要给 ResourceManager 的时候,就通过 ResourceManagerGateWay 给 ResourceManager 发送消息。

Flink RpcEndpoint

JobManager 的 ResourceManager

ResourceManager 的职责就是帮助 主节点 JobManager 完成从节点 TaskManager 的管理和资源的管理和分配等工作。
在这里插入图片描述

TaskManager 的 TaskExecutor

Flink Standalone 集群是一个主从架构,主节点叫做 JobManager,从节点叫做 TaskManager。
在这里插入图片描述
这个 TaskExecutor 是存在于 TaskManager 的内部,真正完成资源提供和分配,接收任务和执行等相关工作。这个角色的意义更等同于 Spark 中的 Worker, YARN 集群中的 NodeManager。

Flink 核心工作组件整体架构抽象

在这里插入图片描述

Flink on YARN 的三种运行模式

在这里插入图片描述
Flink 通过 YARN 的接口实现了自己的 ApplicationMaster。当在 YARN 中部署了Flink,YARN 就会用自己的 Container 来启动 Flink 的 JobManager(也就是 ApplicationMaster)和 TaskManager。

Flink On YARN 有三种模式:
在这里插入图片描述

  • Session 模式:在 YARN 中初始化一个 Flink 集群,开辟指定的资源,之后我们提交的 Flink Job 都在这个 Flink yarn-session 中,也就是说不管提交多少个 job,这些 job 都会共用开始时在 YARN 中申请的资源。这个 Flink 集群会常驻在 YARN 集群中,除非手动停止。
  • Per-Job 模式:在 YARN 中,每次提交 job 都会创建一个新的 Flink 集群,任务之间相互独立,互不影响并且方便管理。任务执行完成之后创建的集群也会消失。 所以每个 Job 执行完毕,Flink 集群关闭,释放资源。
  • Application 模式:Flink-1.11 引入,Client 需要做的事情(main 方法的执行)转移到 JobManager中,多个 env.execute() 视为同一个 Application,相比 Per-Job 模式不用启动多个 Cluster。

在这里插入图片描述

./bin/flink run --target yarn-session # Submission to an already running Flink on YARN cluster
./bin/flink run --target yarn-per-job # Submission spinning up a Flink on YARN cluster in Per-Job Mode
./bin/flink run-application --target yarn-application # Submission spinning up Flink YARN cluster in Application Mode

具体可以参考官网:
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#advanced-cli
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/yarn.html

Flink On YARN 不同的模式,其实入口是不一样的,总的来说是:ClusterEntryPoint,ClusterEntryPoint 是 Flink 集群模式的入口基类,它的实现类结构如下:
在这里插入图片描述
注意:1.14 版本之后,Mesos 的支持已经被移除。(Mesos 背后的商业化公司 Mesosphere 于 2023 年破产倒闭)
YARN 模式 和 Standalone 模式最大的区别就是:

  • Standalone 模式,已经提前把 ClusterEntrypoint 和 TaskManagerRunner 启动好了。集群的资源总量是固定的。
  • Flink On YARN 模式中,在 YARN 集群中,申请到一个 Container 用来启动一个 SessionClusterEntrypoint,然后动态申请足够数量的 Container 来启动 TaskManagerRunner 来运行 Task。

Flink 高可用服务 HighAvailabilityServices

在这里插入图片描述
在 Flink 的内部,需要保证高可用服务的有:ResourceManager,Dispatcher,JobManager,WebMonitorEndpoint 四大组件。

ZooKeeperHaServices 内部最重要的两个方法:

public class ZooKeeperHaServices extends AbstractHaServices {@Overrideprotected LeaderElectionService createLeaderElectionService(String leaderName) {// 创建选举服务return ZooKeeperUtils.createLeaderElectionService(client, configuration, leaderName);}@Overrideprotected LeaderRetrievalService createLeaderRetrievalService(String leaderName) {// 创建监听服务return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, leaderName);}
}

其次重要的三个方法:

public class ZooKeeperHaServices extends AbstractHaServices {@Overridepublic CheckpointRecoveryFactory createCheckpointRecoveryFactory() {// TODO_MA 注释:return new ZooKeeperCheckpointRecoveryFactory(client, configuration, ioExecutor);}@Overridepublic JobGraphStore createJobGraphStore() throws Exception {// TODO_MA 注释:JobGraphStore + ExecutionGraphStorereturn ZooKeeperUtils.createJobGraphs(client, configuration);}@Overridepublic RunningJobsRegistry createRunningJobsRegistry() {// TODO_MA 注释:return new ZooKeeperRunningJobsRegistry(client, configuration);}
}

Flink 选举服务 LeaderElectionService 和监听 LeaderRetrievalService 机制

  • LeaderElectionService : 用来做选举的服务,基于 ZK 实现,真正实现类的名字叫做: DefaultLeaderElectioinService。它的内部通过一个 选举驱动器 LeaderElectionDriver 来完成。LeaderElectionDriver 的内部其实通过 curator 框架提供的一个选举组件:LeaderLatch 来负责进行选举。
  • Flink 的选举和监听机制,都是依托于 Curator 框架的 API 进行封装提供了的实现,具体涉及到的实现类包括:LeaderContender(用于竞选)和 LeaderElectionService(选举服务)和 LeaderRetrievalService(监听服务)。
  • Curator 对应的监听 API

在这里插入图片描述

LeaderElectionService 接口定义

public interface LeaderElectionService {// 启动选举,启动方法将竞争者作为参数void start(LeaderContender contender) throws Exception;// 停止void stop() throws Exception;// 确认void confirmLeadership(UUID leaderSessionID, String leaderAddress);// 判断是否拥有指定 session 下的 leadershipboolean hasLeadership(@Nonnull UUID leaderSessionId);
}

LeaderContender 是 LeaderElectionService 中的参与选举的竞选者。它有四种实现(1.14):
在这里插入图片描述
也即前文中提到的需要保证高可用服务:ResourceManager,Dispatcher,JobManager,WebMonitorEndpoint 四大组件。这四个组件中组合了 LeaderElectionService,同时 LeaderElectionService 也组合了 LeaderContender(你中有我,我中有你)。

LeaderElectionService 选举实现

LeaderElectionService 实现了 LeaderElectionEventHandler 接口的两个方法:onGrantLeadership、onRevokeLeadership。
LeaderElectionService 调用 leaderElectionService.start(this) 开始执行选举,最终通过 ZooKeeperLeaderElectionDriver 实现选举;
ZooKeeperLeaderElectionDriver 实现了 LeaderLatchListener 接口的 isLeader、notLeader 方法,监听到 zookeeper 的对应事件后触发。
isLeader、notLeader 方法的内部,其实是调用 LeaderElectionService 实现的 LeaderElectionEventHandler 接口的两个方法:onGrantLeadership、onRevokeLeadership;
而上述两个方法最终会调用 LeaderContender 的 grantLeadership、revokeLeadership 方法。

// LeaderLatch 基于分布式锁实现的一个选举类
// NodeCache 监听类
public ZooKeeperLeaderElectionDriver(....){// 当 当前组件 选举成功,则回调 this 的 isLeader() 方法// 当 当前组件 没有选举成功,则回到 this 的 notLeader() 方法leaderLatch = new LeaderLatch(client, checkNotNull(latchPath));leaderLatch.addListener(this);leaderLatch.start();// 当监听响应,则会回调 this 的 nodeChanged() 方法cache = new NodeCache(client, leaderPath);cache.getListenable().addListener(this);cache.start();
}

LeaderRetrievalService 接口定义

public interface LeaderRetrievalService {// 开启监听void start(LeaderRetrievalListener listener) throws Exception;// 结束监听void stop() throws Exception;
}
public interface LeaderRetrievalListener {// 监听回调void notifyLeaderAddress(@Nullable String leaderAddress, @Nullable UUID leaderSessionID);void handleError(Exception exception);
}

LeaderRetrievalService 实现了 LeaderRetrievalEventHandler 接口的 notifyLeaderAddress 方法。
LeaderRetrievalService 通过 start(LeaderRetrievalListener) 方法开启监听,最终通过 ZookeeperLeaderRetrievalDriver 实现监听响应。
当发生事件响应的时候,会执行 ZookeeperLeaderRetrievalDriver 的 handleStateChange 方法;最终会在 LeaderRetrievalService 的 notifyLeaderAddress 方法中调用 LeaderRetrievalListener 的同名方法 notifyLeaderAddress。
LeaderRetrievalListener 的实现类:
在这里插入图片描述

小结

上述过程应用到了:监听者模式(观察者模式) + 模板方法模式

Flink 文件/大对象服务 BlobService

在这里插入图片描述
在 Flink 框架中,Flink 提供了一个 BlobService 专门用来提供大文件、对象服务。通俗的说,就是一个文件服务器。存储方式在逻辑上,就是一个 Map,作用是为了集中分发;key 就是 BlobKey,value 就是一个文件。BlobService 接口的定义:

public interface BlobService extends Closeable {PermanentBlobService getPermanentBlobService();TransientBlobService getTransientBlobService();int getPort();
}

BlobService 有两个实现类:
在这里插入图片描述
其中 BlobServer 最为重要,BlobServer 的实现如下:

public class BlobServer extends Thread implements BlobService, BlobWriter, PermanentBlobService, TransientBlobService {// TODO_MA 注释: BlobServer 的内部,启动了一个 BIO 的服务端。用来给 BlobClient 提供服务private final ServerSocket serverSocket;// TODO_MA 注释: 提供存储服务private final BlobStore blobStore;// TODO_MA 注释: Active BlobServerConnection 链接集合private final Set<BlobServerConnection> activeConnections = new HashSet<>();// TODO_MA 注释: 最大链接数,默认 50,可以通过 blob.fetch.num-concurrent 参数进行修改或者配置private final int maxConnections;// TODO_MA 注释: 定时清理任务相关private final ConcurrentHashMap<Tuple2<JobID, TransientBlobKey>, Long> blobExpiryTimes = new ConcurrentHashMap<>();private final long cleanupInterval;private final Timer cleanupTimer;// TODO_MA 注释: 构造方法,详细见代码注释public BlobServer(Configuration config, BlobStore blobStore) throws IOException {// 见代码注释,主要是初始化一些成员变量和一些参数,然后启动一个定时任务,启动一个BIO服务端}// TODO_MA 注释: BlobServer 本身是一个线程public void run() {while(!this.shutdownRequested.get()) {// BlobServer 每接收到一个客户端的链接,就使用一个 线程来专门提供服务BlobServerConnection conn = new BlobServerConnection(serverSocket.accept(), this);// BlobServerConnection 是一个线程,线程启动conn.start();}}
}

Flink 提供了 BlobServer 用来提供文件服务,当然也提供了一个 BlobClient 用来提交请求。这是一个典型的 C/S 架构。BlobClient 的内部,封装了一个 BIO 客户端。在 BlobServer 中,由一个 BlobServerConnection 专门给一个 BlobClient 提供服务。
BlobServerConnection 的结构:

class BlobServerConnection extends Thread {private final Socket clientSocket;private final BlobServer blobServer;public void run() {final InputStream inputStream = this.clientSocket.getInputStream();final OutputStream outputStream = this.clientSocket.getOutputStream();switch(operation) {// 存 文件case PUT_OPERATION:put(inputStream, outputStream, new byte[BUFFER_SIZE]);break;// 取 文件case GET_OPERATION:get(inputStream, outputStream, new byte[BUFFER_SIZE]);break;default:throw new IOException("Unknown operation " + operation);}}// 存文件实现,具体工作机制,看源代码注释private void put(InputStream inputStream, OutputStream outputStream, byte[] buf) throws IOException {}// 取文件实心,具体工作机制,看源代码注释private void get(InputStream inputStream, OutputStream outputStream, byte[] buf) throws IOException {}
}

BlobClient 的结构:

// 客户端
public final class BlobClient implements Closeable {// Socket 客户端private final Socket socket;// 构造方法public BlobClient(InetSocketAddress serverAddress, Configuration clientConfig) throws IOException {Socket socket = new Socket();socket.connect();}// 文件上传服务public PermanentBlobKey uploadFile(JobID jobId, Path file) throws IOException {}public static List<PermanentBlobKey> uploadFiles(InetSocketAddress serverAddress, Configuration clientConfig, JobIDjobId,List<Path> files) throws IOException {}// 文件下载服务static void downloadFromBlobServer(@Nullable JobID jobId, BlobKey blobKey, File localJarFile, InetSocketAddressserverAddress, Configuration blobClientConfig, int numFetchRetries) throws IOException {}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/336133.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PCL 计算异面直线的距离

目录 一、算法原理二、代码实现三、结果展示四、相关链接本文由CSDN点云侠原创,PCL 计算异面直线的距离,爬虫自重。如果你不是在点云侠的博客中看到该文章,那么此处便是不要脸的爬虫与GPT。 一、算法原理 设置直线 A B AB A

2024年1月9日

2024年1月9日09:26:57待在工作室玩千恋万花和登录PTA练习习题 2024年1月9日09:28:02判断素数肯定会成为考试的关键点之一 2024年1月9日15:13:49完成java的复习 2024年1月9日15:16:41判断反馈类型 2024年1月9日15:20:29行列式求系数通过沙路法展开得到 2024年1月9日15:21:1…

浅析ARMv8体系结构:Memory Type

文章目录 内存类型概述Normal内存属性CacheabilityShareability Device内存属性GatheringReorderingEarly Write Acknowledgement 相关参考 内存类型概述 ARMv8内存模型将内存分成了Normal和Device两种类型&#xff0c;不同的内存类型支持的属性也存在差异&#xff0c;其中&am…

HTML---JQurey的基本使用

文章目录 前言一、pandas是什么&#xff1f;二、使用步骤 1.引入库2.读入数据总结 本章目标 &#xff08;1&#xff09;能够搭建jQuery开发环境 &#xff08;2&#xff09;使用ready( )方法加载页面、掌握jQuery语法 使用addClass( )方法和css( )方法为元素添加CSS样式使用n…

文心一言 VS 讯飞星火 VS chatgpt (175)-- 算法导论13.3 4题

四、用go语言&#xff0c;Teach 教授担心 RB-INSERT-FIXUP可能将 T.nil.color 设为 RED&#xff0c;这时&#xff0c;当 z 为根时第1行的测试就不会让循环终止。通过讨论 RB-INSERT-FIXUP永远不会将 T.nil.color 设置为 RED&#xff0c;来说明这位教授的担心是没有必要的。 文…

苍穹外卖Day01——解决总结1中存在的问题

前序章节&#xff1a; 苍穹外卖Day01——总结1 解决总结1中存在的问题 1. 什么是JWT2. POJO、Entity、VO、DTO3. Nginx&#xff08;反向代理&#xff09;4. Data注解 1. 什么是JWT JWT&#xff08;JSON Web Token&#xff09;是一种用于在网络应用间传递信息的开放标准&#…

聊聊 Java 集合框架中的 ArrayList

其实 Java 集合框架也叫做容器&#xff0c;主要由两大接口派生而来&#xff0c;一个是 collection,主要存放对象的集合。另外一个是Map, 存储着键值对&#xff08;两个对象&#xff09;的映射表。 下面就来说说 List接口&#xff0c;List存储的元素是有序、可重复的。其下有三个…

影视视频知识付费行业万能通用网站系统源码,三网合一,附带完整的安装部署教程

在数字化时代&#xff0c;知识付费行业逐渐成为主流。人们对高质量内容的需求日益增长&#xff0c;越来越多的人愿意为有价值的知识和信息服务付费。为了满足这一市场需求&#xff0c;罗峰给大家分享一款全新的影视视频知识付费网站系统源码&#xff0c;为用户提供一站式的知识…

Real Fire Smoke

资产使用高分辨率的动画Spritesheet,并结合了Shuriken粒子系统。具有3种风格的火焰和爆炸。 效果分为7类。 主要特点: - 3种火 - 预制板共计80块 - 5种爆炸声 - 3个循环的火灾声音 - 7张火焰和烟雾的精灵图 - 8x8 帧的 spritesheet,分辨率在 2048 到 8192 像素之间 效果类别…

flask+mysql徐州市天气信息可视化分析系统-计算机毕业设计源码04600

摘 要 信息化社会内需要与之针对性的信息获取途径&#xff0c;但是途径的扩展基本上为人们所努力的方向&#xff0c;由于站在的角度存在偏差&#xff0c;人们经常能够获得不同类型信息&#xff0c;这也是技术最为难以攻克的课题。针对天气信息等问题&#xff0c;对天气信息进行…

笔记本摄像头模拟监控推送RTSP流

使用笔记本摄像头模拟监控推送RTSP流 一、基础安装软件准备 本文使用软件下载链接:下载地址 FFmpeg软件: Download ffmpeg 选择Windows builds by BtbN 一个完整的跨平台解决方案&#xff0c;用于录制、转换和流式传输音频和视频。 EasyDarwin软件&#xff1a;Download Easy…

开源C语言库Melon:Cron格式解析

本文介绍开源C语言库Melon的cron格式解析。 关于 Melon 库&#xff0c;这是一个开源的 C 语言库&#xff0c;它具有&#xff1a;开箱即用、无第三方依赖、安装部署简单、中英文文档齐全等优势。 Github repo 简介 cron也就是我们常说的Crontab中的时间格式&#xff0c;格式如…