[Flink04] Flink部署实践

    Flink部署支持三种模式:本地部署、Standalone部署、Flink on Yarn部署。

    独立(Standalone)模式由Flink自身提供资源,无需其他框架,这种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但Flink 是大数据计算框架,不是资源调度框架,这并不是它的强项;所以还是应该让专业的框架做专业的事,和其他资源调度框架集成更靠谱。

    Flink on Yarn模式,把资源管理交给Yarn实现,计算机资源统一由Haoop Yarn管理,生产环境测试。

     Yarn(yet another resource negotiator)是一个通用分布式资源管理系统和调度平台,为上层应用提供统一的资源管理和调度。在集群利用率、资源统一管理和数据共享等方面带来巨大好处.

1 基础环境

1.1 服务器环境

操作系统环境为CentOS 8。

1)配置规划集群节点间免密访问

参考相关章节或附录的指南配置,可以有效提供部署和管理效率。

2)配置JAVA环境

参考相关章节或附录的指南配置。

3)配置HDFS存储集群

如果需要与HDFS存储集群集成,则需要提前完成配置。

参考相关章节或附录的指南配置,并且Flink规划集群或设备可网络访问。

4)配置zookeeper集群

如果需要部署Standalone模式,则需要提前完成配置。参考相关章节或附录的指南配置。

5)配置 Yarn集群

如果需要部署Flink on Yarn模式,则需要提前完成配置。参考相关章节或附录的指南配置。

1.2 Flink软件基础配置

在本实践案例中,采用的Flink软件包版本 1.14.5,Hadoop的版本为3.2,Spark软件的根目录(SPARK_HOME)为/opt/flink/flink。

源码下载可以通过官方源和国内源两种方式下载,官方源再国外,下载速度慢,国内源采用清华大学的源,速度相对较快,但只保留最新版本。

Apache官方:https://archive.apache.org/dist/flink/

清华大学:https://mirrors.tuna.tsinghua.edu.cn/apache/flink/

下载软件包:

# wget https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.14.5/flink-1.14.5-bin-scala_2.12.tgz
# tar -zxvf flink-1.14.5-bin-scala_2.12.tgz

解压软件包:

# tar -zxvf flink-1.14.5-bin-scala_2.12.tgz
# ln -s flink-1.14.5 flink
# ls flink/
bin  conf  examples  lib  LICENSE  licenses  log  NOTICE  opt  plugins  README.txt

本地模式

最简单的启动方式,其实是不搭建集群,直接本地模式启动。

2.1 配置部署

在本地模式下,不需要启动任何的进程,仅仅是使用本地线程来模拟 Flink 的进程,适用于测试开发调试等,不用更改任何配置信息,只需要保证 JDK8 安装正常即可。

1)启动命令

# /usr/local/flink/bin/start-cluster.sh

2)关闭命令

# /usr/local/flink/bin/stop-cluster.sh

2.2 测试验证

1)Flink启动

# /usr/local/flink/bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host lake01.
Starting taskexecutor daemon on host lake01

2)访问验证

通过<local主机地址>:8081打开

3)jps查看

# jps
3968 Jps
1941 NameNode
3685 TaskManagerRunner
2790 NodeManager
3418 StandaloneSessionClusterEntrypoint
2159 DataNode

4)执行官方用例WordCount

通过执行官方示例,可以看到flink任务运行成功

# /opt/flink/flink/bin/flink run /opt/flink/flink/examples/batch/WordCount.jar
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID d6987ed5b263fc5e297d5be1e28b465a
Program execution finished
Job with JobID d6987ed5b263fc5e297d5be1e28b465a has finished.
Job Runtime: 373 ms
Accumulator Results:
- 843a1470cb2c3e3169dfb25bcda7369d (java.util.ArrayList) [170 elements]
(a,5)
(action,1)
(after,1)
(against,1)
……

观察Flink WebUI,如下图

2.3 问题-提示无法连接Yarn服务

一、问题描述

从flink on yarn模式切换为本地模式,执行start-cluster.sh提示如下错误:

2022-10-13 20:38:05,757 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/opt/flink/flink-1.14.5/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2022-10-13 20:38:05,946 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at **lake02/******:8032
2022-10-13 20:38:06,017 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2022-10-13 20:38:07,043 INFO  org.apache.hadoop.ipc.Client                                 [] - Retrying connect to server: **lake02/******:8032. Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
……
2022-10-13 20:38:16,052 WARN  org.apache.hadoop.ipc.Client                                 [] - Failed to connect to server: **lake02/******:8032: retries get failed due to exceeded maximum allowed retries number: 10
java.net.ConnectException: Connection refused

二、问题分析

通过jps发现缺少TaskManagerRunner。

# jps
1902910 StandaloneSessionClusterEntrypoint
1861160 RunJar
1903349 SqlClient
1861350 RunJar
128746 NameNode
200744 QuorumPeerMain
1865793 Kafka
1911504 Jps

发现workers和masters文件均为空

三、解决方案

恢复masters和workers的内容

# cat masters
localhost:8081# cat workers
localhost

3 Standalone模式

3.1 概述

     Standalone模式是最简单的一种集群模式,不需要Yarn、mesos等资源调度平台,自带集群,资源管理由flink集群管理,开发环境测试使用。

    Standalone模式是一种主从模式,主要有两个组件构成分别是JobManager(Master)和TaskManager(Slave)。

当一个应用提交执行时,Flink的各个组件是如何交互协作的:

1)App程序通过rest接口提交给Dispatcher(rest接口是跨平台,并且可以直接穿过防火墙,不需考虑拦截)。

2)Dispatcher把JobManager进程启动,把应用交给JobManager。

3)JobManager拿到应用后,向ResourceManager申请资源(slots),ResouceManager会启动对应的TaskManager进程,TaskManager空闲的slots会向ResourceManager注册。

4)ResourceManager会根据JobManager申请的资源数量,向TaskManager发出指令(这些slots由你提供给JobManager)。

5)接着,TaskManager可以直接和JobManager通信了(它们之间会有心跳包的连接),TaskManager向JobManager提供slots,JobManager向TaskManager分配在slots中执行的任务。

6)最后,在执行任务过程中,不同的TaskManager会有数据之间的交换。

3.2 配置部署

一、节点规划

本次通过虚拟机部署,采用5个节点 ,每一个节点提供一块500GB的硬盘。

Hostname

IP

用途

说明

labnode01

192.168.80.131

master, jobmanager

OScentos8.0

labnode02

192.168.80.132

slavetaskmanager

OScentos8.0

labnode03

192.168.80.133

slavetaskmanager

OScentos8.0

二、修改配置文件

1)修改flink-conf.yaml配置文件:

##配置master节点ip
jobmanager.rpc.address: 192.168.1.100##配置每个节点的可用slot,1 核CPU对应 1 slot
##the number of available CPUs per machine
taskmanager.numberOfTaskSlots: 30##默认并行度 1 slot资源
parallelism.default: 1

2)修改master和work配置文件

Master文件

# cat masters
labnode01:8081

workers文件

labnode02
labnode03

将以上文件分发各节点对应文件夹。

三、集群启动和关闭

在master节点上执行此脚本,就可以启动集群,前提要保证master节点到slaver节点可以免密登录。

因为它的启动过程是:先在master节点启动jobmanager进程,然后ssh到各slaver节点启动taskmanager进程。

启动集群

# /usr/local/flink/bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host lake02.
Starting taskexecutor daemon on host lake03.
Starting taskexecutor daemon on host lake04.
Starting taskexecutor daemon on host slake05.

停止集群:

# /usr/local/flink/bin/stop-cluster.sh

3.3 运行验证

1)启动Flink

# /usr/local/flink/bin/start-cluster.sh

2)访问flink webUI

3)执行官方用例WordCount

执行命令:

# /usr/local/flink/bin/flink run /usr/local/flink/examples/batch/WordCount.jar
……
- f27663f6191a378629eea720a988cc53 (java.util.ArrayList) [170 elements](a,5)
(action,1)
(after,1)
(against,1)
(all,2)
(and,12)
(arms,1)
……

查看Flink WebUI

4 Flink On Yarn模式

4.1 概述

    独立(Standalone)模式由 Flink 自身提供资源,无需其他框架,这种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但Flink 是大数据计算框架,不是资源调度框架,这并不是它的强项;所以还是应该让专业的框架做专业的事,和其他资源调度框架集成更靠谱。

    在目前大数据生态中,国内应用最为广泛的资源管理平台是Yarn。Yarn(yet another resource negotiator)是一个通用分布式资源管理系统和调度平台,为上层应用提供统一的资源管理和调度。在集群利用率、资源统一管理和数据共享等方面带来巨大好处。

    Flink on Yarn 企业生产环境运行Flink任务大多数的选择。

    在强大的Yarn平台上,Flink是如何在Yarn上集成部署的,其过程是:客户端把Flink 应用提交给Yarn的ResourceManager,Yarn的ResourceManager会向Yarn的 NodeManager 申请容器。在这些容器上,Flink会部署JobManager 和 TaskManager 的实例,从而启动集群。Flink会根据运行在JobManger上的作业所需要的 Slot 数量动态分配TaskManager资源。

https://upload-images.jianshu.io/upload_images/6178553-c51c1c9b2f695e5b.png

1)提交App之前,先上传Flink的Jar包和配置到HDFS,以便JobManager和TaskManager共享HDFS的数据。

2)客户端向ResourceManager提交Job,ResouceManager接到请求后,先分配container资源,然后通知NodeManager启动ApplicationMaster。

3)ApplicationMaster会加载HDFS的配置,启动对应的JobManager,然后JobManager会分析当前的作业图,将它转化成执行图(包含了所有可以并发执行的任务),从而知道当前需要的具体资源。

4)接着,JobManager会向ResourceManager申请资源,ResouceManager接到请求后,继续分配container资源,然后通知ApplictaionMaster启动更多的TaskManager(先分配好container资源,再启动TaskManager)。container在启动TaskManager时也会从HDFS加载数据。

5)最后,TaskManager启动后,会向JobManager发送心跳包。JobManager向TaskManager分配任务。

Flink提供了yarn上运行的3模式,分别为Session-Cluster,Application Mode和Per-Job-Cluster模式。

4.2 配置部署

一、节点规划

本次通过虚拟机部署,采用5个节点 ,每一个节点提供一块500GB的硬盘。

Hostname

IP

用途

说明

labnode01

192.168.80.131

master, jobmanager

labnode02

192.168.80.132

slavetaskmanager

labnode03

192.168.80.133

slavetaskmanager

二、Yarn环境配置

在Yarn-site.xml中配置关闭内存校验。

Yarn-site.xml是hadoop中/etc/hadoop下的配置文件,否则flink任务可能会因为内存超标而被Yarn集群主动杀死。

<!-- Mem Check Start -->
<!-- 设置不检查虚拟内存的值,不然内存不够会报错 --><property><name>yarn.nodemanager.pmem-check-enabled</name><value>false</value></property><property><name>yarn.nodemanager.vmem-check-enabled</name><value>false</value></property><!-- Mem Check end -->

将修改后的配置文件分发到各节点,然后重启Yarn集群。

三、将Flink软件和配置文件分发到Flink集群规划节点

将Flink的配置文件conf/flink-conf.yaml恢复为初始状态。

4.3 Session-Cluster模式(yarn-session)

4.3.1 概述

    Session-Cluster模式需要先启动Flink集群,向Yarn申请资源。以后提交任务都向这里提交。这个Flink集群会常驻在yarn集群中,除非手工停止。

    在向Flink集群提交Job的时候, 如果资源被用完了,则新的Job不能正常提交.

    缺点: 如果提交的作业中有长时间执行的大作业, 占用了该Flink集群的所有资源, 则后续无法提交新的job.

    所以, Session-Cluster适合那些需要频繁提交的多个小Job, 并且执行时间都不长的Job。

会话模式有两种操作模式:

  1. 附加模式(默认):yarn-session.sh客户端将 Flink 集群提交给 YARN,但客户端一直在运行,跟踪集群的状态。如果集群失败,客户端将显示错误。如果客户端被终止,它也会发出集群关闭的信号。
  2. 分离模式(-d或--detached):yarn-session.sh客户端将 Flink 集群提交给 YARN,然后客户端返回。需要再次调用客户端或 YARN 工具来停止 Flink 集群。

4.3.2 常用命令

1)yarn-session.sh参数说明

使用bin/yarn-session.sh --help 查看可用参数:

Usage:Optional-at,--applicationType <arg>     Set a custom application type for the application on Yarn-D <property=value>             use value for given property-d,--detached                   If present, runs the job in detached mode-h,--help                       Help for the Yarn session CLI.-id,--applicationId <arg>       Attach to running Yarn session-j,--jar <arg>                  Path to Flink jar file-jm,--jobManagerMemory <arg>    Memory for JobManager Container with optional unit (default: MB)-m,--jobmanager <arg>           Set to Yarn-cluster to use Yarn execution mode.-nl,--nodeLabel <arg>           Specify Yarn node label for the Yarn application-nm,--name <arg>                Set a custom name for the application on Yarn-q,--query                      Display available Yarn resources (memory, cores)-qu,--queue <arg>               Specify Yarn queue.-s,--slots <arg>                Number of slots per TaskManager-t,--ship <arg>                 Ship files in the specified directory (t for transfer)-tm,--taskManagerMemory <arg>   Memory per TaskManager Container with optional unit (default: MB)-yd,--Yarndetached              If present, runs the job in detached mode (deprecated; use non-Yarn specific option instead)-z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths for high availability mode

2)启动命令

使用Yarn-session.sh命令申请资源初始化一个Flink集群,命令格式如下:

bin/yarn-session.sh <参数>

如示例:

# /opt/flink/flink/bin/yarn-session.sh -d

3)关闭Flink

停止 flink on Yarn 会话模式中的flink集群

yarn application -kill <appid>

echo "stop" | ./bin/flink -id <appid>

如示例:

# echo "stop" | /opt/flink/flink/bin/flink -id application_1661480406159_0025

4.3.3 运行验证

1)启动Flink

# /opt/flink/flink/bin/yarn-session.sh -d

执行结果:

访问Yarn WebUI:

访问Flink WebUI,http://lake04:38347

2)运行官方用例WordCount

# /opt/flink/flink/bin/flink run /opt/flink/flink/examples/batch/WordCount.jar

命令行执行结果:

Flink WebUI的首页:

Flink WebUI中的结果:

3)关闭

执行命令关闭Flink

# echo "stop" | /opt/flink/flink/bin/flink -id application_1661480406159_0025

4.4 Per-Job-Cluster模式(yarn-cluster)

4.4.1 概述

一个Job会对应一个Flink集群,每提交一个作业会根据自身的情况,都会单独向yarn申请资源,直到作业执行完成,一个作业的失败与否并不会影响下一个作业的正常提交和运行。独享Dispatcher和ResourceManager,按需接受资源申请;适合规模大长时间运行的作业。

每次提交都会创建一个新的flink集群,任务之间互相独立,互不影响,方便管理。任务执行完成之后创建的集群也会消失。

4.4.2 常用命令

1)参数说明

flink run -m yarn-cluster --help;可用参数:

该模式下不需要先启动 yarn-session,确保 Hadoop 集群是健康的情况下直接提交 Job 命令:

bin/flink -m yarn-cluster <参数> <jar file>

如示例:

# /opt/flink/flink/bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 /opt/flink/flink/examples/batch/WordCount.jar

4.4.3 运行验证

1)启动并执行官方用例WordCount

# /opt/flink/flink/bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 /opt/flink/flink/examples/batch/WordCount.jar

执行结果:

访问Yarn WebUI:

4.4.4 优缺点

优点:随到随用,只有任务需要运行时才会开启flink集群;运行完就关闭释放资源,资源利用更合理;

缺点:对于小作业不太友好,

适用场景:适合大作业,长时间运行的大作业。

4.5 Application Mode

4.5.1 概述

    Application Mode会在Yarn上启动集群, 应用jar包的main函数(用户类的main函数)将会在JobManager上执行. 只要应用程序执行结束, Flink集群会马上被关闭. 也可以手动停止集群.

与Per-Job-Cluster的区别: 就是Application Mode下, 用户的main函数式在集群中执行的

官方建议:

出于生产的需求, 我们建议使用Per-job or Application Mode,因为他们给应用提供了更好的隔离!

4.5.2 运行验证

启动

# /opt/flink/flink/bin/flink run-application -t yarn-application /opt/flink/flink/examples/batch/WordCount.jar

执行结果:

访问Yarn WebUI:

4.5.3 常见问题

任务提示 Could not allocate the required slot within slot request tim

一、错误日志

Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeoutat org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResource$8(DefaultScheduler.java:539)... 37 more
Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeoutat java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607)at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)... 35 more
Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeoutat org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86)... 28 more
Caused by: java.util.concurrent.TimeoutException: Timeout has occurred: 300000 ms... 29 more

解决方案

将flink的配置文件conf/flink-conf.yaml恢复为初始状态,重新启动flink的Yarn session。

问题FLINK Could not get job jar and dependencies from JAR file: JAR file does not exist:

一、问题描述

使用flink客户端将执行flink提交到Yarn,输入-yjm参数提示错误

# /opt/flink/flink/bin/flink run -m Yarn-cluster -yn 2 -yjm 1024 -ytm 1024 /opt/flink/flink/examples/batch/WordCount.jar
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/flink/flink-1.14.5/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/hadoop/hadoop-3.2.4/share/hadoop/common/lib/slf4j-reload4j-1.7.35.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Could not get job jar and dependencies from JAR file: JAR file does not exist: -yn

二、问题分析

flink1.8版本之后已弃用该参数,ResourceManager将自动启动所需的尽可能多的容器,以满足作业请求的并行性。

三、解决方案

去掉即可

Deployment took more than 60 seconds. Please check if the requested resources are

一、问题描述

日志信息如下:

INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster
INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster

截图如下:

二、解决方案

配置yarn-site.xml

  <property><name>yarn.scheduler.minimum-allocation-mb</name><value>1024</value></property><property><name>yarn.scheduler.maximum-allocation-mb</name><value>102400</value></property><property><name>yarn.nodemanager.resource.cpu-vcores</name><value>32</value></property><property><name>yarn.nodemanager.resource.memory-mb</name><value>51200</value></property>

Flink读取Hudi表时报错lassNotFoundException: *mapred.FileInputFormat

一、问题现象:

执行“select * from t1;”报错,报错信息如下:

Flink SQL> select * from t1;
……
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.hadoop.mapred.FileInputFormat

二、原因分析

mapred代表的是hadoop旧API,而mapreduce代表的是hadoop新的API

三、解决办法

解决办法为复制集群的hadoop-mapreduce-client-core.jar到Flink/lib中。

读取数据表失败NoSuchMethodError: *Preconditions.checkArgument

一、问题描述

创建表格式如下

CREATE TABLE t2(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://10.101.12.140:9000/datas/flink-hudi/test0907/t1',
'table.type' = 'MERGE_ON_READ',
'read.tasks' = '1',
'read.streaming.enabled' = 'true',
'read.streaming.start-commit' = '20210316134557',
'read.streaming.check-interval' = '4'
);
INSERT INTO t2 VALUES('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

执行成功,然后执行表内容查询

select * from t2;

报出如下错误:

Flink SQL> select * from t2;
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputForm
Flink SQL>

二、原因分析

mapred代表的是hadoop旧API,而mapreduce代表的是hadoop新的API

三、解决办法

解决办法为复制集群的hive-exec-3.1.3.jar到各节点的flink/lib中。(注意hive-exec和hadoop版本的匹配)

启动失败NoSuchMethodError: *Preconditions.checkArgument

一、错误描述

通过bin/yarn-session.sh -d启动yarn-session失败,报错信息如下:

The program finished with the following exception:org.apache.hadoop.yarn.exceptions.YarnRuntimeException: java.lang.reflect.InvocationTargetExceptionat org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl.getClient(RpcClientFactoryPBImpl.java:81)at org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC.getProxy(HadoopYarnProtoRPC.java:48)……... 21 more
Caused by: java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)Vat org.apache.hadoop.conf.Configuration.set(Configuration.java:1357)……at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.<init>(ApplicationClientProtocolPBClientImpl.java:209)... 26 more

二、错误原因

Preconditions是guava下的工具类,hudi的源码依赖了不同的项目,这些项目使用了不同的guava版本,所报错误是由于运行时guava版本过旧,没有相应的方法。

三、解决方案

在HADOOP_HOME下查询hadoop使用的guava版本,将其拷贝到FLINK_HOME/lib下:

# find ./ -name guava*
./share/hadoop/common/lib/guava-27.0-jre.jar
./share/hadoop/hdfs/lib/guava-27.0-jre.jar

将文件复制到所有yarn集群的FLINK_HOME/lib下。

重新执行bin/yarn-session.sh -d,成功。

通过yarn启动flink失败-连接yarn失败

通过yarn模式启动flink,报出如下异常,关键信息如下:

# /usr/local/flink/bin/yarn-session.sh -d
….
2022-10-27 11:04:30,332 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at /0.0.0.0:8032
…
2022-10-27 11:04:41,153 WARN  org.apache.hadoop.ipc.Client                                 [] - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 9 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
2022-10-27 11:04:41,153 WARN  org.apache.hadoop.ipc.Client                                 [] - Failed to connect to server: 0.0.0.0/0.0.0.0:8032: retries get failed due to exceeded maximum allowed retries number: 10
java.net.ConnectException: Connection refused
……

原因分析:

1)检查是否启动hadoop集群, 如果没有启动, 是无法连接到hadoop的yarn。

2)flink运行于yarn上,flink要能找到hadoop配置,因为要连接到yarn的resourcemanager和hdfs。

如果正常启动还无法连接yarn, 可以查看一下hadoop的环境变量是否配置好。

在本实例中,时因为无法获取HADOOP_CONF_DIR的配置信息导致问题发生。

二、解决方案

设置HADOOP_CONF_DIR环境变量,并使之生效。

# cat /etc/profile | grep HADOOP_CONF_DIR
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
# source /etc/profile

然后重新启动flink。

BTW:如果已经设置HADOOP_CONF_DIR环境变量,可能由于某种原因HADOOP_CONF_DIR环境变量没有生效,这个原因有很多。

5 参考资料

[01] https://blog.csdn.net/Vector97/article/details/117398947

[02] https://www.jianshu.com/p/8c9c897ea72a

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/478014.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

遇到问题(二) 中文乱码

例如这样&#xff1a; 原本是这样&#xff1a; 解决方法&#xff1a;点击扳手工具设置——Editor——Encoding——选chinese GB2312&#xff08;有的是UTF-8&#xff09;

同学在外包干了两年的点点点,24岁人就快废了

前言 简单的说下&#xff0c;我大学的一个同学&#xff0c;毕业后我自己去了自研的公司&#xff0c;他去了外包&#xff0c;快两年了我薪资、技术各个方面都有了很大的提升&#xff0c;他在外包干的这两年人都要废了&#xff0c;技术没一点提升&#xff0c;学不到任何东西&…

在ubuntu中制作ubuntu的U盘启动盘

概要&#xff1a; 本篇演示在ubuntu22.04中制作ubuntu22.04的U盘启动盘 一、下载ubuntu22.04的iso文件 访问ubuntu官网https://ubuntu.com自行下载ubuntu官网 二、制作U盘启动盘 打开系统自带软件Startup Disk Creator 软件会自动检测iso文件和U盘 点击Make Startup Disk…

Java 线程通信模型小案例

Java 线程通信模型小案例 package com.zhong.thread.usethread;import java.util.ArrayList; import java.util.List;/*** ClassName : CookAndFood* Description : 锁的应用厨师和包子问题* Author : zhx* Date: 2024-02-19 15:43*/ public class CookAndFood {public static …

一分钟学会如何查看Python内置函数的用法及其源码

在用Python进行各种分析的时候&#xff0c;我们会用到各种各样的函数&#xff0c;比如&#xff0c;我们用SQL时&#xff0c;经常使用join、max等各种函数&#xff0c;那么想看Python是否有这个函数&#xff0c;这个时候可能大部分人会百度&#xff0c;那么如何不使用百度&#…

【设计模式】4、策略模式

文章目录 一、问题二、解决方案2.1 真实世界的类比2.2 策略模式结构2.3 适用场景2.4 实现方式2.5 优缺点2.6 与其他模式的关系 三、示例代码3.1 go3.2 rust3.2.1 通过 trait 实现3.2.2 function closure 策略模式是一种行为设计模式&#xff0c;它能定义一系列算法&#xff0c…

第五次作业:LMDeploy 的量化和部署

参考文档&#xff1a;https://github.com/InternLM/tutorial/blob/main/lmdeploy/lmdeploy.md 基础作业&#xff1a; 使用 LMDeploy 以本地对话、网页Gradio、API服务中的一种方式部署 InternLM-Chat-7B 模型&#xff0c;生成 300 字的小故事&#xff08;需截图&#xff09; …

上网行为监控软件能够看到聊天内容吗

随着网络技术的迅猛发展和广泛应用&#xff0c;上网行为监控软件逐渐成为许多企业和组织维护网络安全、提高工作效率的重要工具。这些软件可以实时监控和记录员工的网络活动&#xff0c;包括访问的网站、下载的文件、使用的应用程序等。 然而&#xff0c;一个常见的问题是&…

计算机网络体系结构和参考模型

目录 1、分层结构 2、协议、接口、服务 3、7层OSI模型 4、4层TCP/IP模型 5、5层参考模型 1、分层结构 1.1、为什么需要分层结构&#xff1f; 在网络上传输数据前需要完成一些功能&#xff1a; 1)、发起通信的计算机需要将数据通信的通路进行激活 2)、要告诉网络如何识别…

基于SpringBoot+Dubbo构建的电商平台-微服务架构、商城、电商、微服务、高并发、kafka、Elasticsearc+源代码+文档说明

文章目录 项目用到的技术前端使用的技术后端使用的技术项目模块说明项目搭建方式项目开发进度源码下载地址 项目基于springboot2.1.6.RELEASEDubbo2.7.3 来构建微服务。 业务模块划分&#xff0c;尽量贴合互联网公司的架构体系。所以&#xff0c;除了业务本身的复杂度不是很高之…

MySQL数据库基础(十):DQL数据查询语言

文章目录 DQL数据查询语言 一、数据集准备 二、select查询 三、简单查询 四、条件查询 1、比较查询 2、范围查询 3、逻辑查询 4、模糊查询 5、非空查询 五、排序查询 六、聚合查询 七、分组查询与having子句 1、分组查询介绍 2、group by的使用 3、group by 聚…

linux搭建测试环境详细过程

前言 本文记录下&#xff0c;测试人员如何搭建测试环境&#xff0c;以供后面自己方便找&#xff0c;大家可以借鉴下搭建测试环境需要安装的有&#xff1a;nginx&#xff0c;redis&#xff0c;mysql&#xff0c;java&#xff0c;docker&#xff0c;保证这几个基本就可以用了&…