不同环境Flink配置信息是不同的,为了区分不同环境的配置文件,使用ParameterTool工具读取带有环境的配置文件信息
区分环境的配置文件
三个配置文件:
flink.properties
:决定那个配置文件生效
flink-dev.properties
:测试环境配置文件
flink-prod.properties
:生产环境配置文件
flink.properties配置文件中只配置一项flink.env.active=dev
,读取该配置项然后组装出生效的配置文件名
工具类实现
public class ParameterUtil {/*** 默认配置文件 flink.properties*/private static final String DEFAULT_CONFIG = ParameterConstants.FLINK_ROOT_FILE;/*** 带环境配置文件 flink-%s.properties*/private static final String FLINK_ENV_FILE = ParameterConstants.FLINK_ENV_FILE;/*** 环境变量 flink.env.active*/private static final String ENV_ACTIVE = ParameterConstants.FLINK_ENV_ACTIVE;/*** 配置文件+启动参数+系统环境变量 生成ParameterTool*/public static ParameterTool getParameters(final String[] args) {/* *********************** Java读取资源的方式:** a. Class.getResourceAsStream(Path): Path 必须以 “/”,表示从ClassPath的根路径读取资源* b. Class.getClassLoader().getResourceAsStream(Path):Path 无须以 “/”, 默认从ClassPath的根路径读取资源** 推荐使用第2种,也就是类加载器的方式获取静态资源文件, 不要通过ClassPath的相对路径查找* *********************/InputStream inputStream = ParameterUtil.class.getClassLoader().getResourceAsStream(DEFAULT_CONFIG);try {// 读取根配置文件ParameterTool defaultPropertiesFile =ParameterTool.fromPropertiesFile(inputStream);// 获取环境参数String envActive = getEnvActiveValue(defaultPropertiesFile);// 读取真正的配置环境 (推荐使用 Thread.currentThread() 读取配置文件)return ParameterTool// ParameterTool读取变量优先级 系统环境变量 > 启动参数变量 > 配置文件变量// 从配置文件获取配置.fromPropertiesFile(//当前线程Thread.currentThread()//返回该线程的上下文信息, 获取类加载器.getContextClassLoader().getResourceAsStream(envActive))// 从启动参数中获取配置.mergeWith(ParameterTool.fromArgs(args))// 从系统环境变量获取配置.mergeWith(ParameterTool.fromSystemProperties());} catch (IOException e) {throw new RuntimeException("");}}/*** 配置文件+系统环境变量 生成ParameterTool*/public static ParameterTool getParameters() {InputStream inputStream = ParameterUtil.class.getClassLoader().getResourceAsStream(DEFAULT_CONFIG);/* ************************ 注意:** ParameterTool 读取配置文件需要抛出 IOException,* IOException 的捕捉就在这里 catch** 以前代码是直接抛出,没有进行catch,要注意对以前代码的修改** *********************/try {ParameterTool defaultPropertiesFile =ParameterTool.fromPropertiesFile(inputStream);//获取环境参数String envActive = getEnvActiveValue(defaultPropertiesFile);//读取真正的配置环境 (推荐使用 Thread.currentThread() 读取配置文件)return ParameterTool// ParameterTool读取变量优先级 系统环境变量>启动参数变量>配置文件变量// 从配置文件获取配置.fromPropertiesFile(//当前线程Thread.currentThread()//返回该线程的上下文信息, 获取类加载器.getContextClassLoader().getResourceAsStream(envActive))// 从系统环境变量获取配置.mergeWith(ParameterTool.fromSystemProperties());} catch (Exception e) {throw new FlinkPropertiesException(FlinkPropertiesExceptionInfo.PROPERTIES_NULL);}}/*** 获取环境配置变量*/private static String getEnvActiveValue(ParameterTool defaultPropertiesFile) {// 选择参数环境String envActive = null;if (defaultPropertiesFile.has(ENV_ACTIVE)) {envActive = String.format(FLINK_ENV_FILE, defaultPropertiesFile.get(ENV_ACTIVE));}return envActive;}/*** 从配置文件参数配置流式计算的上下文环境*/public static void envWithConfig(StreamExecutionEnvironment env,ParameterTool parameterTool) {/* ************************ checkpoint 设置** 1.* 若checkpoint 时间不要设置太短,* 这里的时间包括了超时时间** 2.* 设置了周期性checkpoint,* 若上一个周期的checkpoint没完成,* 下一个周期的checkpoint不会开始的.** 3.* 若checkpoint的持续时间超过了超时时间,* 会出现排队,* 过多的checkpoint排队会耗费资源** 4.* 为了解决checkpoint排队堆积,* 需要优化checkpoint的完成效率** *********************/// 每60秒触发checkpointenv.enableCheckpointing(parameterTool.getInt(ParameterConstants.FLINK_CHECKPOINT_INTERVAL));CheckpointConfig ck = env.getCheckpointConfig();// checkpoint 必须在60秒内结束,否则被丢弃ck.setCheckpointTimeout(parameterTool.getInt(ParameterConstants.FLINK_CHECKPOINT_TIMEOUT));// checkpoint间最小间隔 30秒 (指定了这个值, setMaxConcurrentCheckpoints自动默认为1)ck.setMinPauseBetweenCheckpoints(parameterTool.getInt(ParameterConstants.FLINK_CHECKPOINT_MINPAUSE));// checkpoint 语义设置为 精确一致( EXACTLY_ONCE )ck.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 最多允许 checkpoint 失败 3 次ck.setTolerableCheckpointFailureNumber(parameterTool.getInt(ParameterConstants.FLINK_CHECKPOINT_FAILURENUMBER));// 同一时间只允许一个 checkpoint 进行ck.setMaxConcurrentCheckpoints(parameterTool.getInt(ParameterConstants.FLINK_CHECKPOINT_MAXCONCURRENT));// 设置 State 存储env.setStateBackend(new HashMapStateBackend());// 并行度设置env.setParallelism(parameterTool.getInt(ParameterConstants.FLINK_PARALLELISM));}}
读取环境信息
该方法会读取 flink.properties 配置的生效的配置文件,组装成要读取的配置文件
// flink.properties InputStream inputStream = ParameterUtil.class.getClassLoader().getResourceAsStream(DEFAULT_CONFIG);ParameterTool defaultPropertiesFile =ParameterTool.fromPropertiesFile(inputStream); private static String getEnvActiveValue(ParameterTool defaultPropertiesFile) {// 选择参数环境String envActive = null;// 配置文件中是否有该属性 flink.env.activeif (defaultPropertiesFile.has(ENV_ACTIVE)) {// 有的话,直接拼装 flink-%s.properties -> flink-dev.propertiesenvActive = String.format(FLINK_ENV_FILE, defaultPropertiesFile.get(ENV_ACTIVE));}return envActive;}
ParameterTool 获取参数的3种方式
-
fromPropertiesFile 配置文件
-
fromArgs 程序启动参数
- 或者 -- 开头 空格分隔
, 如:-name likelong --age 21 -
fromSystemProperties 系统环境变量, 包括程序 -D启动的变量
内部调用的是 Java提供的 System.getProperties()
ParameterTool 获取参数优先级, 可通过 mergeWith() 设置优先级, 但 mergeWith() 会覆盖前面的同名变量
因此,上述ParameterTool读取变量优先级 系统环境变量 > 启动参数变量 > 配置文件变量
ParameterTool 注册 global 变量
ParameterTool 注册为 global 变量:
env.getConfig().setGlobalJobParameter()
这样, 在上下文中就能获取 ParameterTool
(ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters()
【该方法可以在富函数生命周期方法中调用】
如下:
private static void initEnv(String[] args) {// ParameterTool 注册为 globalparameterTool = ParameterUtil.getParameters();env.getConfig().setGlobalJobParameters(parameterTool);// 配置上下文环境ParameterUtil.envWithConfig(env, parameterTool);}