Flink之状态TTL机制内容详解

1 状态TTL机制

状态的 TTL机制就是Flink提供的自动化删除状态中的过期数据,配置 TTL的 API可以做到对状态中的数据进行冷热数据分离,将热数据一直保存在状态存储器中,将冷数据进行定期删除.
1.1 API简介

TTL常用API如下:

API注解
setTtl(Time.seconds(…))配置过期时长,当状态中的数据到达这个时长则判定为过期数据,在new StateTtlConfig.Builder(Time.seconds(...))也可以配置,如果同时调用setTtl()方法则进行覆盖
updateTtlOnCreateAndWrite()当该条数据在State中插入或者更新的时候,刷新计时,可用于冷热数据分离
updateTtlOnReadAndWrite()读或写都刷新该数据的TTL计时,可用于冷热数据分离
setStateVisibility(…)用于控制状态中过期数据的可见性,当方法中设置StateTtlConfig.StateVisibility.NeverReturnExpired)时则不可见过期未被清理的数据,如果设置StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp则可见过期未被清理的数据.setStateVisibility(...)由异步线程执行,默认是NeverReturnExpired.
setTtlTimeCharacteristic(…)指定TTL的时间语义,默认是event time,可以配置process time,将StateTtlConfig.TtlTimeCharacteristic.ProcessingTime填入方法的参数即可.
disableCleanupInBackground()禁用后台清理过期数据,使用后则不会清理过期数据
cleanupIncrementally(… , …)针对本地状态后端,即HashMapStateBackend. 增量清理, 每当访问状态数据时都会驱动一次过期检查,清除其中部分数据, 这也是HashMapBackend状态后端唯一能真正清理过期数据的方法,cleanupIncrementally(... , ...)方法中需要传入两个参数int cleanupSizeboolean runCleanupForEveryRecord,cleanupSize是指key的数据量,runCleanupForEveryRecord是指是否清理所有过期数据,如果runCleanupForEveryRecord设置的值为true此时cleanupSize就会失效,但是状态数据较多时会严重影响时效性.
cleanupFullSnapshot()针对快照数据,即checkpoint快照. 全量清理, 在做快照时将所有的过期数据进行清理保证快照中没有过期数据,但是状态后端中的过期数据没有进行清理.
cleanupInRocksdbCompactFilter(xxx)针对于RocksdbStateBackend. 只生效于RocksDB状态后端,通过Flink将CompactFilter传给RocksDB,在RocksDBCompact过程中根据过滤条件将过期数据删除,传入的参数为过期时间.
1.2 代码模板
  • 代码

    class StateMapFunc2 implements MapFunction<String, String>, CheckpointedFunction {private ListState<String> listState;@Overridepublic String map(String s) throws Exception {// 将数据添加到状态存储器中,split[0]为用户IDlistState.add(s);// 获取状态存储器中的数据Iterable<String> iter = listState.get();StringBuffer buffer = new StringBuffer();for (String str : iter) {buffer.append(str);}// 将数据添加到状态存储中return buffer.toString();}@Overridepublic void snapshotState(FunctionSnapshotContext ctx) throws Exception {}@Overridepublic void initializeState(FunctionInitializationContext ctx) throws Exception {OperatorStateStore operatorStateStore = ctx.getOperatorStateStore();// 配置State TTLStateTtlConfig ttlConfig = new StateTtlConfig.Builder(Time.seconds(10)) // 设置数据存活时长,当该数据在State中存活时间超过10s时删除该数据// 这个方法也是设置数据存活时长,和StateTtlConfig.Builder(Time.seconds(10))的作用一样,可以不用这个方法,如果用了会覆盖上面设置的时长.setTtl(Time.seconds(10))/*** updateTtlOnCreateAndWrite和updateTtlOnReadAndWrite二选一即可, 这两个方法的主要作用就是配合setTtl方法将冷热数据进行分离**/// 当该条数据在State中插入或者更新的时候,刷新计时.updateTtlOnCreateAndWrite()// 读或写都刷新该数据的TTL计时.updateTtlOnReadAndWrite()/*** setStateVisibility就是设置状态的可见性,前面setTtl方法是设置删除过期数据,删除过期数据实际上是由另一个异步线程周期性(定时器)的完成,也就是说超过10s的数据不一定会马上被删除,但是* 获取数据的时候底层会将超过存活时间的数据进行判断过滤,setStateVisibility就是可以设置是否可以查询到这些过期的数据,NeverReturnExpired和ReturnExpiredIfNotCleanedUp二选一.**/// 不返回过期数据,这个也是默认策略.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)// 返回还没有被清除的过期数据.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)// 指定TTL计时时间语义(默认处理时间).setTtlTimeCharacteristic(StateTtlConfig.TtlTimeCharacteristic.ProcessingTime)// 禁用后台清理过期数据.disableCleanupInBackground()/*** 针对本地状态后端,即HashMapStateBackend* 增量清理, 每当获取状态数据时,迭代器都会向前推进。对遍历的状态数据进行检查,并清理过期的数据* 参数1: 设置每次清理的key的数据量(copyOnWriteStateMap中的key的条目数量)* 参数2: 设置是否清理所有条目也就是key对应的数据,如果设置为true则参数1失效,在状态数据较多时不建议设置为true,会严重影响时效性**/.cleanupIncrementally(10, false)/*** 针对快照数据,即checkpoint快照* 全量清理, 在做快照时将所有的过期数据进行清理保证快照中没有过期数据,但是不会清状态后端中的过期数据**/.cleanupFullSnapshot()/*** 针对于RocksdbStateBackend* 只生效于RocksDB状态后端,通过Flink将CompactFilter传给RocksDB,在RocksDB在Compact过程中根据过滤条件将过期数据删除,传入的参数为过期时间(也就是发生Compact时的过滤条件)**/.cleanupInRocksdbCompactFilter(10000).build();// 配置状态描述,在ListStateDescriptor构造器中声明数据类型,简单类型可以使用xxx.class,符合类型需要使用到TypeInformation.of()ListStateDescriptor descriptor = new ListStateDescriptor("MapState", String.class);// 状态描述器加载TTL配置descriptor.enableTimeToLive(ttlConfig);listState = operatorStateStore.getListState(descriptor);}
    }
    

    代码中是以Operator State为例,如果是Keyed State则在open方法中配置TTL.

1.3 TTL机制详解

在代码模板中有API的使用方式,但是TTL机制不同的方法之间存在互斥或者互不影响的关系.

1.3.1 过期时间设置策略
过期时间设置有两种方式:
  • new StateTtlConfig.Builder(Time.seconds(10))
  • setTtl(Time.seconds(10))

这两种方式都是设置过期时间使用的,但是只需要选用其中一种即可,如果在创建StateTtlConfig对象时就设置了过期时间,又在setTtl方法中设置了过期时间,则会对过期时间进行覆盖,本质上二者都是对同一个变量进行赋值.

  • 源码

    new StateTtlConfig.Builder(Time.seconds(10))

    public static class Builder {private UpdateType updateType = OnCreateAndWrite;private StateVisibility stateVisibility = NeverReturnExpired;private TtlTimeCharacteristic ttlTimeCharacteristic = ProcessingTime;private Time ttl;private boolean isCleanupInBackground = true;// ...// 调用Builder时对ttl变量进行了赋值public Builder(@Nonnull Time ttl) {this.ttl = ttl;}// ...
    }
    

    setTtl(Time.seconds(10))

    public static class Builder {private UpdateType updateType = OnCreateAndWrite;private StateVisibility stateVisibility = NeverReturnExpired;private TtlTimeCharacteristic ttlTimeCharacteristic = ProcessingTime;private Time ttl;private boolean isCleanupInBackground = true;// ...// 这里同样是对ttl进行了赋值@Nonnullpublic Builder setTtl(@Nonnull Time ttl) {this.ttl = ttl;return this;}// ...
    }
    

    通过源码可以看出,使用此API时在创建StateTtlConfig对象时给一个过期时间即可,不需要再调用setTtl方法

1.3.2 过期时间刷新策略
过期时间刷新策略有两种:
  • updateTtlOnCreateAndWrite()
  • updateTtlOnReadAndWrite()

这两方法就是互斥的,只能生效一个,同样是因为二者都是对同一个变量进行赋值,就是说在二者同时调用的情况下,谁在后面调用谁就生效,如代码模板中线调用的updateTtlOnCreateAndWrite()后调用的updateTtlOnReadAndWrite()那么生效的就是updateTtlOnReadAndWrite()策略.

  • 源码

    public static class Builder {private UpdateType updateType = OnCreateAndWrite;private StateVisibility stateVisibility = NeverReturnExpired;private TtlTimeCharacteristic ttlTimeCharacteristic = ProcessingTime;private Time ttl;private boolean isCleanupInBackground = true;// ...// 此方法给updateType进行赋值@Nonnullpublic Builder setUpdateType(UpdateType updateType) {this.updateType = updateType;return this;}/** 二者方法体中调用的都是同一个方法setUpdateType*/@Nonnullpublic Builder updateTtlOnCreateAndWrite() {return setUpdateType(UpdateType.OnCreateAndWrite);}@Nonnullpublic Builder updateTtlOnReadAndWrite() {return setUpdateType(UpdateType.OnReadAndWrite);}// ...
    }
    

    源码可以看出二者调用同一个方法setUpdateType,而setUpdateType方法又是给updateType赋值的一个方法,所以再使用时要根据实际的业务场景选择updateTtlOnCreateAndWrite()updateTtlOnReadAndWrite()中的一个.

1.3.3 返回过期数据策略
回返过期数据策略有两种:
  • setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
  • setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
  • 源码

    public static class Builder {private UpdateType updateType = OnCreateAndWrite;private StateVisibility stateVisibility = NeverReturnExpired;private TtlTimeCharacteristic ttlTimeCharacteristic = ProcessingTime;private Time ttl;private boolean isCleanupInBackground = true;// ...// 此方法给stateVisibility进行赋值@Nonnullpublic Builder setStateVisibility(@Nonnull StateVisibility stateVisibility) {this.stateVisibility = stateVisibility;return this;}// 下面两个方法体中都是调用setStateVisibility方法@Nonnullpublic Builder returnExpiredIfNotCleanedUp() {return setStateVisibility(StateVisibility.ReturnExpiredIfNotCleanedUp);}@Nonnullpublic Builder neverReturnExpired() {return setStateVisibility(StateVisibility.NeverReturnExpired);}// ...
    }
    

    这二者同样是互斥的原则,使用选其一即可,即使都调用也是后被调用者生效.

1.3.4 过期数据清除策略
过期数据清除策略有三种:
  • cleanupIncrementally(10, false)
  • cleanupFullSnapshot()
  • cleanupInRocksdbCompactFilter(10000)

这三种过期数据清除策略针对的是不同的场景(本地状态后端、快照、RocksDB状态后端),所以三者是可以同时使用的,不会存在同时调用后者会对前者进行覆盖的问题,在API简介章节介绍了这种三策略的作用,这里着重介绍cleanupIncrementally策略.

HashMapStateBackend使用的存储结构是Flink团队自己开发的一种数据存储结构copyOnWriteStateMap,说这个存储结构是因为cleanupIncrementally策略删除过期数据的操作和这种结构息息相关.

关于copyOnWriteStateMap的结构可以简单的理解为K,V形式存储的结构,其中的Key就是使用keyBy时指定的key,如果没有使用keyBy那么所有数据key都会给一个相同的默认值,其中的Value是指ListStateMapState等,也就是在构建状态存储器时候选择存储形式,如下图:
ttl02

在本地状态后端(HashMapStateBackend)中默认使用的就是cleanupIncrementally清除策略,默认值为cleanupIncrementally(5, false),也就是说只要设置了TTL的过期时间,HashMapStateBackend就会使用cleanupIncrementally策略来清理过期数据,只不过cleanupIncrementally对用户提供了选择方式,这里将结合图解说明cleanupIncrementally如何清除过期数据的.
ttl02

  1. 只要访问状态数据就会触发cleanupIncrementally执行.
  2. 如果用户没有设置cleanupIncrementally,TTL会根据cleanupIncrementally(5, false)来删除过期数据,如果用户指定了参数则按照用户定义的参数删除数据.
  3. 比如现在是cleanupIncrementally(10, false),迭代器会从k1开始,到k10结束,将这10个条目的key中的ListState中的过期数据进行清理.
  4. CopyOnWriteStateMap中的数据存放是无序的,而且Flink在创建CopyOnWriteStateMap时候给的默认大小是128,也就说处理数据中key的数量超过128,否则就算只有一个key,CopyOnWriteStateMap的大小也是128,迭代器最少也要迭代128次.
  5. 当设置cleanupIncrementally(10, false)时,假如数据中只有一个key,那么这个k -> ListState(...)CopyOnWriteStateMap中的存放位置是任意的,假设在CopyOnWriteStateMap中存放的位置是22,就会出现当第一次访问状态数据时,并不会删除这个key对应的ListState中的数据,访问状态数据时同样还是不会删除过期数据,只有第三次访问时,才会删除过期数据,因为cleanupSize设置的大小为10,迭代器每次只会迭代10个条目的key,每当访问状态数据时,迭代器都会从最后一次迭代的指针位置开始继续推进.
  6. 当迭代器的指针推进位置到128时,又会从0的位置从新开始推进(这里是指CopyOnWriteStateMap的大小是128),以此类推.
  7. 如果cleanupIncrementally(10, true)中的runCleanupForEveryRecordtrue时,那就是说每次访问状态数据迭代器都会把CopyOnWriteStateMap中的所有条目都清理一遍,所以说为true时第一个参数(cleanupSize)会失效.

cleanupIncrementally的执行机制就很好的解释了,为什么在使用本地状态后端(HashMapStateBackend)时经常会出现明明已经来了7,8条数据,数据过期数据还没有清理到,或者距离上一次访问状态数据过了1h甚至更久都没有清理过期数据的情况.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/211044.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

window非gui形式运行jmeter脚本

配置jmeter环境 新增1个环境变量&#xff1a; JMETER_HOMED:\Tools\apache-jmeter-5.0 【jmeter文件夹】 编辑CLASSPATH&#xff1a; CLASSPATH后面加上 %JMETER_HOME%\lib\ext\ApacheJMeter_core.jar; %JMETER_HOME%\lib\jorphan.jar; 编辑path&#xff1a; path后面加上 %JM…

Java基层卫生健康云综合管理(云his)系统源码

云HIS&#xff08;Cloud-Based Healthcare Information System&#xff09;是基于云计算的医院健康卫生信息系统。它运用云计算、大数据、物联网等新兴信息技术&#xff0c;按照现代医疗卫生管理要求&#xff0c;在一定区域范围内以数字化形式提供医疗卫生行业数据收集、存储、…

基于C#实现赫夫曼树

赫夫曼树又称最优二叉树&#xff0c;也就是带权路径最短的树&#xff0c;对于赫夫曼树&#xff0c;我想大家对它是非常的熟悉&#xff0c;也知道它的应用场景&#xff0c;但是有没有自己亲手写过&#xff0c;这个我就不清楚了&#xff0c;不管以前写没写&#xff0c;这一篇我们…

可视化大屏时代的到来:智慧城市管理的新思路

随着科技的不断发展&#xff0c;智能芯片作为一种新型的电子元件&#xff0c;被广泛应用于各个领域&#xff0c;其中智慧芯片可视化大屏是一种重要的应用形式。 一、智慧芯片可视化大屏的优势 智慧芯片可视化大屏是一种将智能芯片与大屏幕显示技术相结合的产品&#xff0c;山海…

四肽-3——增加皮肤光滑度、紧致度,让肌肤看起来更年轻

Caprooyl四肽-3&#xff0c;也称为KGHK Caproic acid&#xff0c;是一种基于TGF-&#xff08;转化生长因子β&#xff09;的仿生脂肽结构&#xff0c;在细胞外基质成分的自然产生中发挥重要作用。肽序列是Lys-Gly-His-Lys。它可以减少细纹和皱纹的出现&#xff0c;提高皮肤弹性…

神经网络训练技巧

1. 逐渐增加训练数据规模&#xff0c;比如先在小数据集上训练&#xff0c;之后再增大数据集继续训练。

硬件开发笔记(十三):RK3568底板电路HDMI2.0模块原理图分析、HDMI硬件接口详解

若该文为原创文章&#xff0c;转载请注明原文出处 本文章博客地址&#xff1a;https://hpzwl.blog.csdn.net/article/details/134504319 红胖子网络科技博文大全&#xff1a;开发技术集合&#xff08;包含Qt实用技术、树莓派、三维、OpenCV、OpenGL、ffmpeg、OSG、单片机、软硬…

Vatee万腾的数字时代探险:vatee科技力量的未来洞悉

在数字化的时代潮流中&#xff0c;Vatee万腾以其强大的科技力量&#xff0c;正在进行一场前所未有的数字时代探险。 Vatee万腾的数字时代探险源于其对未来的洞悉。通过深度研究和前瞻性思考&#xff0c;他们将科技力量与未来趋势相结合&#xff0c;勾勒出数字时代的新蓝图&…

自然资源土地管理法律法规知识竞赛这么办才高端

近些年&#xff0c;全国各地自然资源厅举办了土地管理法律法规知识竞赛&#xff0c;从我公司承办的这些赛事来看&#xff0c;传统的必答题、抢答题、风险题的方式已无法激起现场比赛气氛&#xff0c;需要更加复杂有趣的环节设置及高端竞赛软件及其配套设备加持才可以让知识竞赛…

【带头学C++】----- 八、C++面向对象编程 ---- 8.1 面向对象编程概述

目录 8.1 面向对象编程概述 8.1.1 面向对象概念&#xff08;OOP&#xff09; 8.1.2 面向过程概念 8.1 面向对象编程概述 8.1.1 面向对象概念&#xff08;OOP&#xff09; 面向对象&#xff08;Object-Oriented&#xff09;是一种编程范式&#xff0c;它将程序设计中的数据和…

自定义label组件

自定义label组件 支持边框绘制 支持shape背景(按指定圆角裁剪,矩形,圆角矩,圆形),支持指定角圆角 支持自定义阴影(颜色,偏移,深度) 边框颜色支持状态选择器 预览 核心绘制辅助类 public class LabelHelper {private final Paint paint;private Paint shadowPaint;private fina…