【Flink系列二十四】Flink HistoryServer 实现原理分析-源码解读

news/2024/11/13 17:57:55/文章来源:https://www.cnblogs.com/slankka/p/18544469

Flink系列二十四 Flink HistoryServer 实现原理

首先,作业停止或者故障时,调用 HistoryServerArchivist 进行归档

public interface HistoryServerArchivist {/*** Archives the given {@link ExecutionGraphInfo} on the history server.** @param executionGraphInfo to store on the history server* @return Future which is completed once the archiving has been completed.*/CompletableFuture<Acknowledge> archiveExecutionGraph(ExecutionGraphInfo executionGraphInfo);
}

调用的入口

protected CompletableFuture<CleanupJobState> jobReachedTerminalState(ExecutionGraphInfo executionGraphInfo) {
}...private CompletableFuture<Acknowledge> archiveExecutionGraphToHistoryServer(ExecutionGraphInfo executionGraphInfo) {
}

FsJobArchivist将数据序列化成JSON文本

org.apache.flink.runtime.history.FsJobArchivist#archiveJob

数据会写入: jobmanager.archive.fs.dir 对应的目录

public static Path archiveJob(Path rootPath, JobID jobId, Collection<ArchivedJson> jsonToArchive) throws IOException {try {FileSystem fs = rootPath.getFileSystem();Path path = new Path(rootPath, jobId.toString());OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);try (JsonGenerator gen = jacksonFactory.createGenerator(out, JsonEncoding.UTF8)) {gen.writeStartObject();gen.writeArrayFieldStart(ARCHIVE);for (ArchivedJson archive : jsonToArchive) {gen.writeStartObject();gen.writeStringField(PATH, archive.getPath());gen.writeStringField(JSON, archive.getJson());gen.writeEndObject();}gen.writeEndArray();gen.writeEndObject();} catch (Exception e) {fs.delete(path, false);throw e;}LOG.info("Job {} has been archived at {}.", jobId, path);return path;} catch (IOException e) {LOG.error("Failed to archive job.", e);throw e;}}

查看归档的JSON数据

hdfs dfs -get hdfs://corp.slankka-hdfs.com/application/app-logs/flink/da12f990aba5bcdff710e96c5a409123

查看该数据,其实是一个JSON结构

{"archive": [{"path": "/jobs/overview","json": "..." },{"path": "/jobs/da12f990aba5bcdff710e96c5a409123/config","json": "..."},{"path": "/jobs/da12f990aba5bcdff710e96c5a409123/checkpoints","json": "..."}]
}

HistoryServer 启动后开始扫描归档的数据

public class HistoryServer {void start() throws IOException, InterruptedException {//...executor.scheduleWithFixedDelay(getArchiveFetchingRunnable(), 0, refreshIntervalMillis, TimeUnit.MILLISECONDS);}private Runnable getArchiveFetchingRunnable() {return Runnables.withUncaughtExceptionHandler(() -> archiveFetcher.fetchArchives(), FatalExitExceptionHandler.INSTANCE);}
}

处理读取到的归档:下载到HistoryServer本地磁盘

可以发现,写入归档和处理归档的代码逻辑在同一个类文件内:FsJobArchivist

org.apache.flink.runtime.webmonitor.history.HistoryServerArchiveFetcher#processArchiveprivate void processArchive(String jobID, Path jobArchive) throws IOException {for (ArchivedJson archive : FsJobArchivist.getArchivedJsons(jobArchive)) {String path = archive.getPath();String json = archive.getJson();File target;if (path.equals(JobsOverviewHeaders.URL)) {target = new File(webOverviewDir, jobID + JSON_FILE_ENDING);} else if (path.equals("/joboverview")) { // legacy pathLOG.debug("Migrating legacy archive {}", jobArchive);json = convertLegacyJobOverview(json);target = new File(webOverviewDir, jobID + JSON_FILE_ENDING);} else {// this implicitly writes into webJobDirtarget = new File(webDir, path + JSON_FILE_ENDING);}...}	
}

上述逻辑会写入 historyserver.web.tmpdir 对应位置,如果未定义,则下载到 java.io.tmpdir

相对路径

HistoryServerArchiveFetcher(...) {this.webDir = checkNotNull(webDir);this.webJobDir = new File(webDir, "jobs");Files.createDirectories(webJobDir.toPath());this.webOverviewDir = new File(webDir, "overviews");
}

API 返回相应的文件

见 History Server Available Requests

本质上都是拉取以及解析对应的文件

router.addGet("/:*", new HistoryServerStaticFileServerHandler(webDir));

Router 相应的请求分为两类:

第一类

  1. Job 归档的JSON文件

第二类:History Server 的 JobManager 以及 TaskManager 的日志链接

  1. /jobs/<jobid>/jobmanager/log-url
  2. /jobs/<jobid>/taskmanagers/<taskmanagerid>/log-url

有关Container日志的信息

  1. JobManager信息:暂无,可以通过 <jobId> 反查 applicationId 直接调用Yarn REST API
  2. TaskManager信息:可以通过下方链接文章的顺序获得,或者直接通过上述JSON获得 Vertex 中的TaskManager信息
  • A. 寻找 path=/jobs/<jobid>/vertices/ 找到所有的 vertexid

  • B. 寻找 path=/jobs/<jobid>/vertices/<vertexid>/taskmanagers 的值并建立映射关系

例如

{"id": "cbc357ccb763df2852fee8c4fc7d55f2","name": "Source: Custom Source -> Process -> Map","now": 1731465012643,"taskmanagers": [{"host": "kka136119:40079","status": "CANCELED","taskmanager-id": "container_e87_1724243239726_2160_01_000003"},{"host": "kka129134:40853","status": "CANCELED","taskmanager-id": "container_e87_1724243239726_2160_01_000002"}...]
}

通过API调用和这里JSON数据是一样的。

TaskManager hostName截断

因为HostNameSupplier 即便在开启了TaskManager反解析,即 jobmanager.retrieve-taskmanager-hostname,也只会返回第一段。(默认开启,不建议关闭)

org.apache.flink.runtime.taskmanager.TaskManagerLocation.DefaultHostNameSupplier#getHostNameIf the FQDN is the textual IP address, then the hostname is also the IP address
If the FQDN has only one segment (such as "localhost", or "host17"), then this is used as the hostname.
If the FQDN has multiple segments (such as "worker3.subgroup.company.net"), then the first segment (here "worker3") will be used as the hostname

如果对Yarn进行日志集成,则需要自行补全 HostName

日志链接的实现参考

【Flink系列十八】History Server 重新登场,如何实现Yarn日志集成

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/833051.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

GIS工具哪家强?五款优质GIS工具箱对比分析

本文将为大家介绍五款功能各异的GIS工具箱,包括GISBox、QGIS、MapTiler、Saga GIS和Whitebox GAT。每款工具箱都有其独特的功能和应用场景,能够满足不同类型的GIS任务需求。无论是数据处理、空间分析、影像处理还是可视化需求,这些工具都能为用户提供丰富的解决方案。本文将…

安装Kibana__基于Windows系统

在此之前,请先安装Elasticsearch并启动一、下载安装包并解压安装https://www.elastic.co/downloads/kibanaterminal进入Kibana安装目录,通过.\bin\kibana.bat指令启动 二、配置后登录Kibana页面 启动成功,访问http://localhost:5601/,页面会要求你填入EnrollmentToken(此…

【Maya 2025软件下载与安装教程 含补丁】

1、安装包 「maya2025」: 链接:https://pan.quark.cn/s/de0d9d452470 提取码:Rhjp 「maya2024」: 链接:https://pan.quark.cn/s/887e52b68f51 提取码:jvyp 「maya2023」: 链接:https://pan.quark.cn/s/71f46b3d26e5 提取码:b6mA 「maya2020」: 链接:https://pan.qua…

Prometheus + Grafana 监控平台搭建

1、下载 prometheus和node_exporter:https://prometheus.io/download/ 下载完后上传到服务器解压 tar -zxvf prometheus-3.0.0-rc.1.linux-amd64.tar.gztar -zxvf node_exporter-1.8.2.linux-amd64.tar.gz2、启动 node_exporter nohup ./node_exporter --web.listen-address…

阿里云可观测 2024 年 10 月产品动态

阿里云可观测 2024 年 10 月产品动态

题解:CF2025E Card Game

在这 权当卡特兰数的复习题吧。不会卡特兰数的可以先看文末。如果没有花色 \(1\) 这道题就很简单了,对于每个别的花色都有 \(C(m)\) 种分配方案。\(C(n)\) 表示卡特兰数的第 \(n\) 项,答案就是乘起来。 发现除了花色 \(1\) 每种花色的牌都是独立的。这启示我们枚举每种牌用了…

应用网关的演进历程和分类

网关作为互联网流量的入口,其形态也在跟随软件架构持续演进迭代中。我们下面就聊一聊网关的演进历程以及在时下火热的 AI 浪潮下,网关又会迸发怎样新的形态。作者:耿蕾蕾(如葑) 唯一不变的是变化,在现代复杂的商业环境中,企业的业务形态与规模往往处于不断变化和扩大之中…

Vue网站发布到iis后提示404页面不可访问

vue重定向和跨域配置:https://zhuanlan.zhihu.com/p/5306882511.安装组件:URL Rewrite:https://www.iis.net/downloads/microsoft/url-rewriteApplication Request Routing:https://www.iis.net/downloads/microsoft/application-request-routing2.新建一个web.config 放到…

fastadmin 数据记录行上添加操作按钮并设置权限

1. 一键 curd 以及配置菜单 编写控制器方法 - 业务逻辑 再次一键生成菜单 - 生成刚刚写审核通过方法的控制器。 2. 自定义控制器中方法。 3. 查看角色组的权限,并授予该角色权限。 4. 前端修改 index 页面,因为需要权限所以需要加上一句话data-operate-log="{:$auth-&g…

10 倍性能提升, GraalVM 应用可观测实践

ARMS 发布了支持 GraalVM 应用的 Java Agent 探针,可为 GraalVM 应用提供开箱即用的可观测能力。作者:铖朴、层风 GraalVM 静态编译 背景介绍 随着云原生浪潮的蓬勃发展,利用云原生技术为企业应用提供极致的弹性能力是企业数字化升级的核心诉求。但 Java 作为一种解释执行+运…

日立移动硬盘插在电脑有异响数据恢复

当日立移动硬盘插在电脑上出现异响且数据无法读取时,这通常表明硬盘可能遇到了某些问题。以下是一些建议的解决步骤和数据恢复方法: 一、异响原因排查 供电不足: 移动硬盘需要足够的电力供应才能正常工作。如果电脑的USB接口供电不足,可能会导致移动硬盘发出异响且无法读取…

辣椒销售策略:智慧应对顾客,洞悉销售真谛

售卖辣椒的商贩常常面临这样的询问:“你的辣椒辣不辣?”回答这个问题时,他们面临两难:若答辣,怕辣之人即刻离去;若答不辣,或许又错失了喜辣的顾客,交易依旧难以达成。 某日闲暇,我驻足于一位售卖辣椒的妇人三轮车旁,好奇她如何解决这一逻辑悖论。 见暂无顾客,我自以…