Flink实时任务性能调优

前言

通常我们在开发完Flink任务提交运行后,需要对任务的参数进行一些调整,通常需要调整的情况是任务消费速度跟不上数据写入速度,从而导致实时任务出现反压、内存GC频繁(FullGC)频繁、内存溢出导致TaskManager被Kill。

今天讲一下Flink任务中常见的性能场景及解决思路。

反压

在Flink任务中多个Task之间需要进行数据交换,在流式计算中数据的生产方的生产速度和消费方的消费速度不匹配时,可能会导致计算节点OOM或丢失数据,在Flink中通过反压机制平衡数据生产方和消费方的处理速度,以求系统达到整体的平衡。

实时任务出现反压时,在Blink版本中做了大量的改进,从资源使用、作业调优、日志查询等维度新增了大量功能,使得用户可以更方便的对Flink作业进行运维,Vertex 增加了InQueue,OutQueue等多项指标,可以方便的追踪数据的反压、过滤及倾斜情况通常,我们可以通过在Flink Web UI中观察出现红色的Vertex节点及其上下游,重点需要关注的指标是Out Queue的占用率,当Out Queue占用率高表示该节点的下游节点消费能力不足,需要重点调解该下游节点的计算资源(已贡献社区)。

如果是老的Flink版本,可以先在 Flink web ui 中,定位到具体的算子之后,查看 BackPressure 模块,通过颜色和数值来判断任务的繁忙和反压情况(若颜色为红色,表示当前算子繁忙,有反压的情况;若颜色为绿色,标识当前算子不繁忙,没有反压)。

如果你看到 subtasks 的状态为 OK 表示没有反压。HIGH 表示这个 subtask 被反压。状态用如下定义:

  • OK: 0% <= 反压比例 <= 10%
  • LOW: 10% < 反压比例 <= 50%
  • HIGH: 50% < 反压比例 <= 100%

常见场景及解决思路

场景一、任务反压(算子消费瓶颈)

典型场景为,一连串的计算节点都是红色,Out Queue都是100%,此时需要定位到最后一个Out Queue为100%的算子节点的下游节点,该节点的消费能力不达标,导致上游消息堆积。我们可以对该算子的资源进行调整,如 适当调大并发度,对应内存可适当调小,如果是窗口聚合节点则可以调大内存(在开窗场景下,window数据计算节点需要缓存窗口大小时长的数据,并在checkpoint时需要将窗口的中间状态存储,因此需要增加窗口计算节点的堆内存)

场景二、任务无反压,但延迟高(source端瓶颈)

这种情况表现为,整体没有出现明显反压,即所有计算节点的Out Queue都不高。

这种情况的出现,有可能是上游源头节点的并发度不够,如kafka的topic有三个分区,消费的时候,只开了一个并发,通常建议消费并发数和topic的分区一致。

如果增加source的并发度之后,延迟没有下降,则可能是在任务源头节点包含复杂计算,且该算子和源头并发一致,出现了合并任务链(operater chain),此时可以考虑将source算子单独剥离出来,即调整source下游算子的并发度,解除合并任务链。

场景三、任务异常(内存超用)

实时任务异常Failover的情况下,我们需要关注任务是否因为某个TaskManager内存超用被kill的情况,如果发现异常日志中记录了:

"org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager 'null'. This might indicate that the remote task manager was lost"

则普遍情况是因为内存超用,我们需要根据异常信息中提示的任务节点,调整执行计划中对应节点的内存配置,具体可在WebUI中查看Exceptions模块中查看,其中Root Exception里面记录了最新一次发生的异常栈,Exception History中记录的是任务运行过程中所有发生的异常,以及每次异常的计算节点是哪些。

场景四、GroupBy

针对group by场景,可以通过配置minibatch,来提升吞吐,降低状态的访问,减少对下游的输出压力。

在Stram SQL纯流模式下,每进来一条数据都会去操作state,IO消耗较大,设置minibatch后,同一个key的一批数据只访问一次state,且只输出最新的一条数据,即减少了state的访问也减少了向下游的数据更新,minibatch的配置如下:

# 1. 表示整个job允许延迟
blink.miniBatch.allowLatencyMs=5000# 2. 单个batch的size
blink.miniBatch.size=1000

场景五、任务重启,并设置重启时间(初始时间)

这种情况一般出现在任务刚启动时有非常高的延迟,可能是因为在任务启动时或重启时设置了一个比较老的start time,导致任务从很早的时间开始拉取数据,会导致刚开始整个任务的qps非常高,在监控上的表现为一开始有很高的延迟,随后缓慢下降直到正常水平,若没有下降则可以适当增加资源,一般来说这种情况不需要特殊处理,可以根据实际需求来判断是否需要调整start time为当前时间。

场景六、Time Interval Join 代替 双流Join

建议在双流join的时候,使用时间窗口join,而不是双流join。

默认情况下双流join会将两条流的数据都缓存到状态中,默认状态存储时长为1.5天,状态太大会导致join算子性能低下。

而实际上大部分场景,join都是由时效性要求的,比如商品曝光1分钟引导的点击,其业务上隐含了数据的时效性关联条件,当数据失效后,它的状态是可以清理掉释放资源。

 

总结

  1. 判断是否出现反压,在反压节点定位算子,增加并发或调整cpu资源;
  2. 若无明显反压,则可能是source端瓶颈,可以提升并发度,尽量和source源的分区数量一致,另外可以查看是否是因为source数据处理的算子逻辑太复杂,且和读算子并行一致出现合并任务链(operater chain)的情况,此时可以调整该计算算子的并行度,将source算子剥离出链。
  3. 参数优化,配置minibatch(针对GroupBy),可提升吞吐,降低状态的访问次数,减少对下游的输出压力。
  4. 双流join场景中使用Time Interval Join,而不是双流Join,双流Join会把状态保持1.5天,非常消耗资源。
  5. 重置任务时,根据实际需求出发,若默认很久以前的数据可放弃,则可以调整start time为较近的时间。
  6. 提升batchSize增加读写IO。

希望本文对你有帮助,请点个赞鼓励一下作者吧~ 谢谢!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/19724.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

OpenCV 入门教程: Harris角点检测

OpenCV 入门教程&#xff1a; Harris 角点检测 导语一、Harris角点检测原理二、Harris角点检测步骤三、示例应用总结 导语 Harris 角点检测是图像处理中常用的角点检测算法&#xff0c;用于寻找图像中的角点特征。角点是图像中具有明显边缘变化的位置&#xff0c;具有独特性和不…

踩坑记录:xorm的sql()函数后无法使用FindAndCount()

一、起因 懒省事想用 sql 写个有连表操作的分页查询语句&#xff0c;看到 xorm 中 sql 要和代码紧密纠缠在一起就不爽&#xff0c;所有就想用 xorm 中的 session.SQL(sql).Limit(size, offset).FindAndCount() 方法解决。不曾想。。。 sql: expected 15 destination argument…

【搜索引擎Solr】Solr:提高批量索引的性能

几个月前&#xff0c;我致力于提高“完整”索引器的性能。我觉得这种改进足以分享这个故事。完整索引器是 Box 从头开始创建搜索索引的过程&#xff0c;从 hbase 表中读取我们所有的文档并将文档插入到 Solr 索引中。 我们根据 id 对索引文档进行分片&#xff0c;同样的文档 id…

【springboot】RestTemplate序列化RedisSerializer到底该选哪个

RedisTemplate是Spring Data Redis提供给用户的最高级的抽象客户端&#xff0c;用户可直接通过RedisTemplate对Redis进行多种操作。 在项目中使用需要引入如下依赖&#xff1a; <dependency><groupId>org.springframework.boot</groupId><artifactId>…

IDEA自动添加注释作者版本时间等信息

File | Settings | Editor | Live Templates 点击加号&#xff0c;选择第二项 设置一个名称 再次点击加号&#xff0c;选择第一项 填写名称&#xff08;设置完成后再代码中输入该名称即可插入该注释内容&#xff09;&#xff0c;描述&#xff0c;及内容 /*** author 名字…

机器学习技术(五)——特征工程与模型评估

机器学习技术&#xff08;五&#xff09;——特征工程与模型评估(2️⃣) 文章目录 机器学习技术&#xff08;五&#xff09;——特征工程与模型评估(:two:)二、模型评估1、Accuracy score2、Confusion matrix混淆矩阵1、多值2、二值 3、Hamming loss4、Precision, recall and F…

list分段截取方法

对list 分段截取方法是一个常见的操作&#xff0c;通常用于对list数据批量操作&#xff0c;常见的场景有返回分页展示数据&#xff0c;对大数据进行分批次插入数据库等 package com.hmdp.dto;import org.apache.commons.collections4.ListUtils; import org.springframework.u…

Oracle中没有show tables;如何用指令来显示表名,Excel关于VLOOKUP函数的使用。

一、问题&#xff1a;Oracle中没有show tables;如何用指令来显示表名。 解决方案&#xff1a; owner NAPSDEV更换为owner CNAPSIIDB。NAPSDEV是用户名&#xff0c;CNAPSIIDB是数据库名。在这里&#xff0c;我想让它显示的是我在Navicat中的CNAPSIIDB数据库下的所有表的名称。所…

【数据仓库】Windows源码安装DataEase,DataEase二次开发

上文记录了DataEase入门使用指南&#xff0c;本文主要记录Windows下源码安装及二次开发步骤【数据仓库】BI看板DataEase入坑指南_wenchun001的博客-CSDN博客 改动文件 源码 GitHub release 链接: Releases dataease/dataease GitHub SDK 软件环境 后端&#xff1a; JDK …

【云原生】二进制部署k8s集群(中)搭建node节点

连接上文 在上文已经成功部署了etcd分布式数据库、master01节点&#xff0c; 本文将承接上文的内容&#xff0c;继续部署Kubernetes集群中的 worker node 节点和 CNI 网络插件 1. 部署 Worker Node 组件 1.1 work node 组件部署前需了解的节点注册机制 kubelet 采用 TLS Bo…

实操:用Flutter构建一个简单的微信天气预报小程序

​ 微信小程序是一种快速、高效的开发方式&#xff0c;Flutter则是一款强大的跨平台开发框架。结合二者&#xff0c;可以轻松地开发出功能丰富、用户体验良好的微信小程序。 这里将介绍如何使用Flutter开发一个简单的天气预报小程序&#xff0c;并提供相应的代码示例。 1. 准备…

【数学建模】常微分方程

常微分方程 博客园解释 https://www.cnblogs.com/docnan/p/8126460.html https://www.cnblogs.com/hanxi/archive/2011/12/02/2272597.html https://www.cnblogs.com/b0ttle/p/ODEaid.html matlab求解常微分方程 https://www.cnblogs.com/xxfx/p/12460628.html https://www.cn…