【动态读取配置文件】ParameterTool读取带环境的配置信息

不同环境Flink配置信息是不同的,为了区分不同环境的配置文件,使用ParameterTool工具读取带有环境的配置文件信息

区分环境的配置文件

image-20231217202925250

三个配置文件:

flink.properties:决定那个配置文件生效

flink-dev.properties:测试环境配置文件

flink-prod.properties:生产环境配置文件

flink.properties配置文件中只配置一项flink.env.active=dev,读取该配置项然后组装出生效的配置文件名

工具类实现

public class ParameterUtil {/*** 默认配置文件 flink.properties*/private static final String DEFAULT_CONFIG = ParameterConstants.FLINK_ROOT_FILE;/*** 带环境配置文件 flink-%s.properties*/private static final String FLINK_ENV_FILE = ParameterConstants.FLINK_ENV_FILE;/*** 环境变量 flink.env.active*/private static final String ENV_ACTIVE = ParameterConstants.FLINK_ENV_ACTIVE;/*** 配置文件+启动参数+系统环境变量 生成ParameterTool*/public static ParameterTool getParameters(final String[] args) {/* *********************** Java读取资源的方式:** a. Class.getResourceAsStream(Path): Path 必须以 “/”,表示从ClassPath的根路径读取资源* b. Class.getClassLoader().getResourceAsStream(Path):Path 无须以 “/”, 默认从ClassPath的根路径读取资源** 推荐使用第2种,也就是类加载器的方式获取静态资源文件, 不要通过ClassPath的相对路径查找* *********************/InputStream inputStream = ParameterUtil.class.getClassLoader().getResourceAsStream(DEFAULT_CONFIG);try {// 读取根配置文件ParameterTool defaultPropertiesFile =ParameterTool.fromPropertiesFile(inputStream);// 获取环境参数String envActive = getEnvActiveValue(defaultPropertiesFile);// 读取真正的配置环境 (推荐使用 Thread.currentThread() 读取配置文件)return ParameterTool// ParameterTool读取变量优先级 系统环境变量 > 启动参数变量 > 配置文件变量// 从配置文件获取配置.fromPropertiesFile(//当前线程Thread.currentThread()//返回该线程的上下文信息, 获取类加载器.getContextClassLoader().getResourceAsStream(envActive))// 从启动参数中获取配置.mergeWith(ParameterTool.fromArgs(args))// 从系统环境变量获取配置.mergeWith(ParameterTool.fromSystemProperties());} catch (IOException e) {throw new RuntimeException("");}}/*** 配置文件+系统环境变量 生成ParameterTool*/public static ParameterTool getParameters() {InputStream inputStream = ParameterUtil.class.getClassLoader().getResourceAsStream(DEFAULT_CONFIG);/* ************************ 注意:** ParameterTool 读取配置文件需要抛出 IOException,* IOException 的捕捉就在这里 catch** 以前代码是直接抛出,没有进行catch,要注意对以前代码的修改** *********************/try {ParameterTool defaultPropertiesFile =ParameterTool.fromPropertiesFile(inputStream);//获取环境参数String envActive = getEnvActiveValue(defaultPropertiesFile);//读取真正的配置环境 (推荐使用 Thread.currentThread() 读取配置文件)return ParameterTool// ParameterTool读取变量优先级 系统环境变量>启动参数变量>配置文件变量// 从配置文件获取配置.fromPropertiesFile(//当前线程Thread.currentThread()//返回该线程的上下文信息, 获取类加载器.getContextClassLoader().getResourceAsStream(envActive))// 从系统环境变量获取配置.mergeWith(ParameterTool.fromSystemProperties());} catch (Exception e) {throw new FlinkPropertiesException(FlinkPropertiesExceptionInfo.PROPERTIES_NULL);}}/*** 获取环境配置变量*/private static String getEnvActiveValue(ParameterTool defaultPropertiesFile) {// 选择参数环境String envActive = null;if (defaultPropertiesFile.has(ENV_ACTIVE)) {envActive = String.format(FLINK_ENV_FILE, defaultPropertiesFile.get(ENV_ACTIVE));}return envActive;}/*** 从配置文件参数配置流式计算的上下文环境*/public static void envWithConfig(StreamExecutionEnvironment env,ParameterTool parameterTool) {/* ************************ checkpoint 设置** 1.* 若checkpoint 时间不要设置太短,* 这里的时间包括了超时时间** 2.* 设置了周期性checkpoint,* 若上一个周期的checkpoint没完成,* 下一个周期的checkpoint不会开始的.** 3.* 若checkpoint的持续时间超过了超时时间,* 会出现排队,* 过多的checkpoint排队会耗费资源** 4.* 为了解决checkpoint排队堆积,* 需要优化checkpoint的完成效率** *********************/// 每60秒触发checkpointenv.enableCheckpointing(parameterTool.getInt(ParameterConstants.FLINK_CHECKPOINT_INTERVAL));CheckpointConfig ck = env.getCheckpointConfig();// checkpoint 必须在60秒内结束,否则被丢弃ck.setCheckpointTimeout(parameterTool.getInt(ParameterConstants.FLINK_CHECKPOINT_TIMEOUT));// checkpoint间最小间隔 30秒 (指定了这个值, setMaxConcurrentCheckpoints自动默认为1)ck.setMinPauseBetweenCheckpoints(parameterTool.getInt(ParameterConstants.FLINK_CHECKPOINT_MINPAUSE));// checkpoint 语义设置为 精确一致( EXACTLY_ONCE )ck.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 最多允许 checkpoint 失败 3 次ck.setTolerableCheckpointFailureNumber(parameterTool.getInt(ParameterConstants.FLINK_CHECKPOINT_FAILURENUMBER));// 同一时间只允许一个 checkpoint 进行ck.setMaxConcurrentCheckpoints(parameterTool.getInt(ParameterConstants.FLINK_CHECKPOINT_MAXCONCURRENT));// 设置 State 存储env.setStateBackend(new HashMapStateBackend());// 并行度设置env.setParallelism(parameterTool.getInt(ParameterConstants.FLINK_PARALLELISM));}}

读取环境信息

该方法会读取 flink.properties 配置的生效的配置文件,组装成要读取的配置文件

		// flink.properties	InputStream inputStream = ParameterUtil.class.getClassLoader().getResourceAsStream(DEFAULT_CONFIG);ParameterTool defaultPropertiesFile =ParameterTool.fromPropertiesFile(inputStream);  private static String getEnvActiveValue(ParameterTool defaultPropertiesFile) {// 选择参数环境String envActive = null;// 配置文件中是否有该属性 flink.env.activeif (defaultPropertiesFile.has(ENV_ACTIVE)) {// 有的话,直接拼装 flink-%s.properties -> flink-dev.propertiesenvActive = String.format(FLINK_ENV_FILE, defaultPropertiesFile.get(ENV_ACTIVE));}return envActive;}

ParameterTool 获取参数的3种方式

  1. fromPropertiesFile 配置文件

  2. fromArgs 程序启动参数

    - 或者 -- 开头 空格分隔, 如:-name likelong --age 21

  3. fromSystemProperties 系统环境变量, 包括程序 -D启动的变量

    内部调用的是 Java提供的 System.getProperties()

ParameterTool 获取参数优先级, 可通过 mergeWith() 设置优先级, 但 mergeWith() 会覆盖前面的同名变量

因此,上述ParameterTool读取变量优先级 系统环境变量 > 启动参数变量 > 配置文件变量

ParameterTool 注册 global 变量

ParameterTool 注册为 global 变量:env.getConfig().setGlobalJobParameter()

这样, 在上下文中就能获取 ParameterTool

(ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters()

【该方法可以在富函数生命周期方法中调用】

如下:

    private static void initEnv(String[] args) {// ParameterTool 注册为 globalparameterTool = ParameterUtil.getParameters();env.getConfig().setGlobalJobParameters(parameterTool);// 配置上下文环境ParameterUtil.envWithConfig(env, parameterTool);}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/282416.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

mysql 中查询字段内容长度以及是否是纯数字的函数

1、字段内容长度查询 select * from abpusers where LENGTH(UserName)<4 结果&#xff1a; 2、 查询纯数字&#xff0c;使用正则表达式 select * from abpusers where UserName REGEXP ^[0-9]$ 结果如下&#xff1a;

【C语言加油站】qsort函数的模拟实现

qsort函数的模拟实现 导言一、回调函数二、冒泡排序2.1 冒泡排序实现升序 三、qsort函数3.1 qsort函数的使用3.2 比较函数 四、通过冒泡排序模拟实现qsort函数4.1 任务需求4.2 函数参数4.3 函数定义与声明4.4 函数实现4.4.1 函数主体4.4.2 比较函数4.4.3 元素交换 4.5 my_qsort…

navicat连接mysql报错过程以及解决

1.刚开始报错如下图 于是我利用这段报错信息&#xff08;2059 - Authentication plugin caching sha2 password cannot be loaded&#xff09;百度。 1.1上面报错的原因和解决过程 百度说是mysql的加密方式不对&#xff0c;如下图 所以这里进入数据库&#xff0c;修改mysql这…

缓存击穿的原因和解决方案

缓存击穿 原因&#xff1a;一个被高并发访问并且缓存重建业务较复杂的key突然失效了&#xff0c;无数的请求访问会在瞬间给数据库带来巨大的冲击 解决方案 1.互斥锁 优点 没有额外的内存消耗保证一致性实现简单 缺点 线程需要等待&#xff0c;性能受影响可能有死锁风险 …

使用下载代替物理串口输出-STM32 Debug (printf) Viewer

使用下载代替物理串口输出-STM32 Debug 硬件要求配置方法代码要求打印输出结果 硬件要求 STM32的PB9、PB10引脚的串口1通常用作其他功能使用后&#xff0c;无法通过printf()函数打印输出想要调试输出查看变量或调试信息。现已使用另外一种方法实现printf()函数打印输出。 ST…

EasyExcel实现⭐️本地excel数据解析并保存到数据库的脚本编写,附案例实现

目录 前言 一、 EasyExcel 简介 二、实战分析 1.Controller控制层 2. service方法和方法实现 3.EasyExcel相关类 3.1 excel表实体类 3.2 自定义监听器类 4.测试 4.1 准备工作 4.2 断点调试 5.生成脚本文件 三、分析总结 章末 小伙伴们大家好&#xff0c;最近开发的时…

JavaSE第7篇:封装

文章目录 一、封装1、好处:2、使用 二、四种权限修饰符三、构造器1、作用2、说明3、属性赋值的过程 一、封装 封装就是将类的属性私有化,提供公有的方法访问私有属性 不对外暴露打的私有的方法 单例模式 1、好处: 1.只能通过规定的方法来访问数据 2.隐藏类的实例细节,方便…

【JavaEE】多线程案例 - 定时器

作者主页&#xff1a;paper jie_博客 本文作者&#xff1a;大家好&#xff0c;我是paper jie&#xff0c;感谢你阅读本文&#xff0c;欢迎一建三连哦。 本文于《JavaEE》专栏&#xff0c;本专栏是针对于大学生&#xff0c;编程小白精心打造的。笔者用重金(时间和精力)打造&…

深度学习项目实战:垃圾分类系统

简介&#xff1a; 今天开启深度学习另一板块。就是计算机视觉方向&#xff0c;这里主要讨论图像分类任务–垃圾分类系统。其实这个项目早在19年的时候&#xff0c;我就写好了一个版本了。之前使用的是python搭建深度学习网络&#xff0c;然后前后端交互的采用的是java spring …

python学习2

大家好&#xff0c;这里是七七&#xff0c;这次学习的例子是一个数据清洗代码。完整代码在最后。 开始这次的内容 目录 代码一 代码二 代码三 代码四 全部代码 代码一 xlsx_file data/附件1.xlsx df_1 pd.read_excel(xlsx_file) xlsx_file data/附件2.xlsx df pd.re…

workflow系列教程(4)Parallel并联任务流

往期教程 如果觉得写的可以,请给一个点赞关注支持一下 观看之前请先看,往期的博客教程,否则这篇博客没办法看懂 workFlow c异步网络库编译教程与简介 C异步网络库workflow入门教程(1)HTTP任务 C异步网络库workflow系列教程(2)redis任务 workflow系列教程(3)Series串联任务流…

一个 tomcat 下如何部署多个项目?附详细步骤

一个tomcat下如何部署多个项目&#xff1f;Linux跟windows系统下的步骤都差不多&#xff0c;以下linux系统下部署为例。windows系统下部署同理。 1 不修改端口&#xff0c;部署多个项目 清楚tomcat目录结构的应该都知道&#xff0c;项目包是放在webapps目录下的&#xff0c;那…