Flink学习笔记(二):Flink内存模型

文章目录

  • 1、配置总内存
  • 2、JobManager 内存模型
  • 3、TaskManager 内存模型
  • 4、图形化展示
  • 5、实际案例计算内存分配

1、配置总内存

Flink JVM 进程的进程总内存(Total Process Memory)包含了由 Flink 应用使用的内存(Flink 总内存)以及由运行 Flink 的 JVM 使用的内存。 Flink 总内存(Total Flink Memory)包括 JVM 堆内存(Heap Memory)和堆外内存(Off-Heap Memory)。 其中堆外内存包括直接内存(Direct Memory)和本地内存(Native Memory)。详细的配置参数:https://nightlies.apache.org/flink/flink-docs-release-1.12/zh/deployment/config.html

配置 Flink 进程内存最简单的方法是指定以下两个配置项中的任意一个:

配置项TaskManager 配置参数JobManager 配置参数
Flink 总内存taskmanager.memory.flink.sizejobmanager.memory.flink.size
进程总内存taskmanager.memory.process.sizejobmanager.memory.process.size

Flink 启动需要明确配置:

TaskManagerJobManager
taskmanager.memory.flink.sizejobmanager.memory.flink.size
taskmanager.memory.process.sizejobmanager.memory.process.size
taskmanager.memory.task.heap.size 和
taskmanager.memory.managed.sizejobmanager.memory.heap.size

不建议同时设置进程总内存和 Flink 总内存。 这可能会造成内存配置冲突,从而导致部署失败。 额外配置其他内存部分时,同样需要注意可能产生的配置冲突。
JVM参数

2、JobManager 内存模型

JobManager内存模型
如上图所示,下表中列出了 Flink JobManager 内存模型的所有组成部分,以及影响其大小的相关配置参数。

组成部分配置参数描述
JVM 堆内存jobmanager.memory.heap.sizeJobManager 的 JVM 堆内存。
堆外内存jobmanager.memory.off-heap.sizeJobManager 的堆外内存(直接内存或本地内存)。
JVM Metaspacejobmanager.memory.jvm-metaspace.sizeFlink JVM 进程的 Metaspace。
JVM 开销jobmanager.memory.jvm-overhead.min、jobmanager.memory.jvm-overhead.max、jobmanager.memory.jvm-overhead.fraction用于其他 JVM 开销的本地内存,例如栈空间、垃圾回收空间等。该内存部分为基于进程总内存的受限的等比内存部分。

如配置总内存中所述,另一种配置 JobManager 内存的方式是明确指定 JVM 堆内存的大小(jobmanager.memory.heap.size)。 通过这种方式,用户可以更好地掌控用于以下用途的 JVM 堆内存大小。

3、TaskManager 内存模型

内存模型详解
如上图所示,下表中列出了 Flink TaskManager 内存模型的所有组成部分,以及影响其大小的相关配置参数。

组成部分配置参数描述
框架堆内存(Framework Heap Memory)taskmanager.memory.framework.heap.size用于 Flink 框架的 JVM 堆内存(进阶配置)。
任务堆内存(Task Heap Memory)taskmanager.memory.task.heap.size用于 Flink 应用的算子及用户代码的 JVM 堆内存。
托管内存(Managed memory)taskmanager.memory.managed.size、taskmanager.memory.managed.fraction由 Flink 管理的用于排序、哈希表、缓存中间结果及 RocksDB State Backend 的本地内存。
框架堆外内存(Framework Off-heap Memory)taskmanager.memory.framework.off-heap.size用于 Flink 框架的堆外内存(直接内存或本地内存)(进阶配置)。
任务堆外内存(Task Off-heap Memory)taskmanager.memory.task.off-heap.size用于 Flink 应用的算子及用户代码的堆外内存(直接内存或本地内存)。
网络内存(Network Memory)taskmanager.memory.network.min、taskmanager.memory.network.max、taskmanager.memory.network.fraction用于任务之间数据传输的直接内存(例如网络传输缓冲)。该内存部分为基于 Flink 总内存的受限的等比内存部分。
JVM Metaspacetaskmanager.memory.jvm-metaspace.sizeFlink JVM 进程的 Metaspace。
JVM 开销taskmanager.memory.jvm-overhead.min、taskmanager.memory.jvm-overhead.max、taskmanager.memory.jvm-overhead.fraction用于其他 JVM 开销的本地内存,例如栈空间、垃圾回收空间等。该内存部分为基于进程总内存的受限的等比内存部分。

我们可以看到,有些内存部分的大小可以直接通过一个配置参数进行设置,有些则需要根据多个参数进行调整。通常情况下,不建议对框架堆内存和框架堆外内存进行调整。 除非你非常肯定 Flink 的内部数据结构及操作需要更多的内存。 这可能与具体的部署环境及作业结构有关,例如非常高的并发度。 此外,Flink 的部分依赖(例如 Hadoop)在某些特定的情况下也可能会需要更多的直接内存或本地内存。

4、图形化展示

JobManager 内存直观展示
Job
TaskManager 内存直观展示
task
树状图表示:
树状图

5、实际案例计算内存分配

如果是 Flink On YARN 模式下:

taskmanager.memory.process.size = 4096 MB = 4G
taskmanager.memory.network.fraction = 0.15
taskmanager.memory.managed.fraction = 0.45

然后根据以上参数,就可以计算得到各部分的内存大小:

taskmanager.memory.jvm-overhead = 4096 * 0.1 = 409.6 MB
taskmanager.memory.flink.size = 4096 - 409.6 - 256 = 3430.4 MB
taskmanager.memory.network = 3430.4 * 0.15 = 514.56 MB
taskmanager.memory.managed = 3430.4 * 0.45 = 1543.68 MB
taskmanager.memory.task.heap.size = 3430.4 - 128 * 2 - 1543.68 - 514.56 = 1116.16 MB

实际案例

Flink 启动参考配置参数:

/home/dev/soft/flink/bin/flink run \-m yarn-cluster \-yD akka.ask.timeout='360 s' \-yD akka.framesize=20485760b \-yD blob.fetch.backlog=1000 \-yD blob.fetch.num-concurrent=500 \-yD blob.fetch.retries=50 \-yD blob.storage.directory=/data1/flinkdir \-yD env.java.opts.jobmanager='-XX:ErrorFile=/tmp/java_error_%p.log -XX:+PrintGCDetails -XX:-OmitStackTraceInFastThrow -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=512m -XX:+UseG1GC -XX:MaxGCPauseMillis=300 -XX:InitiatingHeapOccupancyPercent=50 -XX:+ExplicitGCInvokesConcurrent -XX:+AlwaysPreTouch -XX:AutoBoxCacheMax=20000 -XX:G1HeapWastePercent=5 -XX:G1ReservePercent=25 -Dfile.encoding=UTF-8' \-yD env.java.opts.taskmanager='-XX:ErrorFile=/tmp/java_error_%p.log -XX:+PrintGCDetails -XX:-OmitStackTraceInFastThrow -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:MetaspaceSize=256m -XX:MaxMetaspaceSize=1024m -XX:+UseG1GC -XX:MaxGCPauseMillis=300 -XX:InitiatingHeapOccupancyPercent=50 -XX:+ExplicitGCInvokesConcurrent -XX:+AlwaysPreTouch -XX:AutoBoxCacheMax=20000 -Dsun.security.krb5.debug=false -Dfile.encoding=UTF-8' \-yD env.java.opts='-XX:ErrorFile=/tmp/java_error_%p.log -XX:+PrintGCDetails -XX:-OmitStackTraceInFastThrow -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:MetaspaceSize=256m -XX:MaxMetaspaceSize=1024m -Dfile.encoding=UTF-8' \-yD execution.attached=false \-yD execution.buffer-timeout='1000 ms' \-yD execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION \-yD execution.checkpointing.interval='30 min' \-yD execution.checkpointing.max-concurrent-checkpoints=1 \-yD execution.checkpointing.min-pause='2 min' \-yD execution.checkpointing.mode=EXACTLY_ONCE \-yD execution.checkpointing.timeout='28 min' \-yD execution.checkpointing.tolerable-failed-checkpoints=8 \-yD execution.checkpointing.unaligned=true \-yD execution.checkpointing.unaligned.forced=true \-yD heartbeat.interval=60000 \-yD heartbeat.rpc-failure-threshold=5 \-yD heartbeat.timeout=340000 \-yD io.tmp.dirs=/data1/flinkdir \-yD jobmanager.heap.size=1024m \-yD jobmanager.memory.jvm-metaspace.size=268435456b \-yD jobmanager.memory.jvm-overhead.max=1073741824b \-yD jobmanager.memory.jvm-overhead.min=1073741824b \-yD jobmanager.memory.network.fraction=0.2 \-yD jobmanager.memory.network.max=6GB \-yD jobmanager.memory.off-heap.size=134217728b \-yD jobmanager.memory.process.size='18360 mb' \-yD metrics.reporter.promgateway.deleteOnShutdown=true \-yD metrics.reporter.promgateway.factory.class=org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory \-yD metrics.reporter.promgateway.filter.includes=\*:dqc\*,uptime,taskSlotsTotal,numRegisteredTaskManagers,taskSlotsAvailable,numberOfFailedCheckpoints,numRestarts,lastCheckpointDuration,Used,Max,Total,Count,Time:gauge,meter,counter,histogram \-yD metrics.reporter.promgateway.groupingKey="yarn=${yarn};hdfs=${hdfs};job_name=TEST-broadcast-${jobName//./-}-${provId}" \-yD metrics.reporter.promgateway.host=172.17.xxxx.xxxx \-yD metrics.reporter.promgateway.interval='60 SECONDS' \-yD metrics.reporter.promgateway.jobName="TEST-broadcast-${jobName//./-}-${provId}" \-yD metrics.reporter.promgateway.port=10080 \-yD metrics.reporter.promgateway.randomJobNameSuffix=true \-yD pipeline.name="TEST-broadcast-${jobName//./-}-${provId}" \-yD pipeline.object-reuse=true \-yD rest.flamegraph.enabled=true \-yD rest.server.numThreads=20 \-yD restart-strategy.failure-rate.delay='60 s' \-yD restart-strategy.failure-rate.failure-rate-interval='3 min' \-yD restart-strategy.failure-rate.max-failures-per-interval=3 \-yD restart-strategy=failure-rate \-yD security.kerberos.krb5-conf.path=/home/dev/kerberos/krb5.conf \-yD security.kerberos.login.contexts=Client,KafkaClient \-yD security.kerberos.login.keytab=/home/dev/kerberos/xxxx.keytab \-yD security.kerberos.login.principal=xxxx \-yD security.kerberos.login.use-ticket-cache=false \-yD state.backend.async=true \-yD state.backend=hashmap \-yD state.checkpoints.dir=hdfs://xxxx/flink/checkpoint/${jobName//.//}/$provId \-yD state.checkpoint-storage=filesystem \-yD state.checkpoints.num-retained=3 \-yD state.savepoints.dir=hdfs://xxxx/flink/savepoint/${jobName//.//}/$provId \-yD table.exec.hive.fallback-mapred-writer=false \-yD task.manager.memory.segment-size=4mb \-yD taskmanager.memory.framework.off-heap.size=1GB \-yD taskmanager.memory.managed.fraction=0.2 \-yD taskmanager.memory.network.fraction=0.075 \-yD taskmanager.memory.network.max=16GB \-yD taskmanager.memory.process.size='50 gb' \-yD taskmanager.network.netty.client.connectTimeoutSec=600 \-yD taskmanager.network.request-backoff.max=120000 \-yD taskmanager.network.retries=100 \-yD taskmanager.numberOfTaskSlots=10 \-yD web.timeout=900000 \-yD web.upload.dir=/data1/flinkdir \-yD yarn.application.name="TEST-broadcast-${jobName//./-}-${provId}" \-yD yarn.application.queue=$yarnQueue \-yD yarn.application-attempts=10 \

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/127716.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

软件安全测试的含义是什么?

早在信息行业发展的初期,就有互联网公司意识到了保护软件产品安全的重要性,可时至今日,我们总能够从报道中瞥见某某公司遭黑客入侵导致信息泄露的新闻,其中甚至不乏Facebook这些以用户通讯、身份信息为支柱产业的大厂。 信息安全事…

实现协议互通:探索钡铼BL124EC的EtherCAT转Ethernet/IP功能

钡铼BL124EC是一种用于工业网络通信的网关设备,专门用于将EtherCAT协议转换成Ethernet/IP协议。它充当一个桥梁,连接了使用不同协议的设备,使它们能够无缝地进行通信和互操作。 具体来说,BL124EC通过支持EtherCAT(以太…

【重拾C语言】六、批量数据组织(二)线性表——分类与检索(主元排序、冒泡排序、插入排序、顺序检索、对半检索)

目录 前言 六、批量数据组织——数组 6.1~3 数组基础知识 6.4 线性表——分类与检索 6.4.1 主元排序 6.4.2 冒泡排序 6.4.3 插入排序 6.4.4 顺序检索(线性搜索) 6.4.5 对半检索(二分查找) 算法比较 前言 线性表是一种常…

vue解决:Parsing error: No Babel config file detected for ....

报错信息 Parsing error: No Babel config file detected for C:\Users\Admin\Desktop\shabi\work\src\App.vue. Either disable config file checking with requireConfigFile: false, or configure Babel so that it can find the config files. 分析错误:没有检测…

1154. 一年中的第几天

1154. 一年中的第几天 C代码: int dayOfYear(char * date){int year atoi(date);int month atoi(date 5); // 2019-01-09int day atoi(date 8);int amount[12] {31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31};if (year % 400 0 || (year % 4 0 &&…

智能AI系统源码ChatGPT系统源码+详细搭建部署教程+AI绘画系统+已支持OpenAI GPT全模型+国内AI全模型

一、AI创作系统 SparkAi创作系统是基于OpenAI很火的ChatGPT进行开发的Ai智能问答系统,支持OpenAI GPT全模型国内AI全模型。本期针对源码系统整体测试下来非常完美,可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。那么如何搭建部署AI创作Chat…

Spring AOP 详解及@Trasactional

Spring AOP 详解 AOP基础 AOP: Aspect Oriented Program, 面向切面编程。解耦(组织结构调整)、增强(扩展)。 AOP术语 术语 说明 Aspect(切面) 横切于系统的连接点实现特定功能的类 JoinPoint&#xf…

Day-08 基于 Docker安装 Nginx 镜像-负载均衡

1、反向代理后,自然而然就引出了负载均衡,下面简单实现负载均衡的效果; 2、实现该效果需要再添加一个 Nginx ,所以要增加一个文件夹。 /home|---mutou|----nginx|----conf.d|----html|----conf.d2|----html3 1.创建 html3 文件夹, 新建 index…

10.5 认识XEDParse汇编引擎

XEDParse 是一款开源的x86指令编码库,该库用于将MASM语法的汇编指令级转换为对等的机器码,并以XED格式输出,目前该库支持x86、x64平台下的汇编编码,XEDParse的特点是高效、准确、易于使用,它可以良好地处理各种类型的指…

JOSEF约瑟 闭锁继电器 LB-7 YDB-100 100V 50HZ 控制断路器的合闸或跳闸

闭锁继电器LB-7导轨安装名称:闭锁继电器型号:LB-7闭锁继电器额定电压100V功率消耗≤10VA触点容量220V1.5A40W返回系数≥0.8 LB-1A、LB-1D、DB-1、HBYB-102/D YDB-100、HLO、DB-100、LB-7型闭锁继电器 一、用途 LB-7型闭锁继电器(以下简称继电器)用于发电厂及变电所内高压母线…

【Redis实战】击穿+雪崩+穿透

架构 短信登录 基于session实现登录 流程图 代码实现 Slf4j Service public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {/*** session用户key*/public static final String USER_CONSTANT "user";Overridepub…

Mate 60上的一个贴心小功能!帮你避免消息通知被别人看到的尴尬!

当手机越来越融入生活&#xff0c;包含的个人隐私信息也就越来越多。想必不少人都经历过类似的场景&#xff1a; 手机被同事或者朋友暂时借用&#xff0c;但此时消息弹窗弹出一条即时聊天信息。 又或者拍合照、多人用手机看视频、多人观剧时&#xff0c;聊天框突然弹出消息。…