FlinkSQL【分组聚合-多维分析-性能调优】应用实例分析

FlinkSQL处理如下实时数据需求:
实时聚合不同 类型/账号/发布时间 的各个指标数据,比如:初始化/初始化后删除/初始化后取消/推送/成功/失败 的指标数据。要求实时产出指标数据,数据源是mysql cdc binlog数据。

代码实例

--SET table.exec.state.ttl=86400s; --24 hour,默认: 0 ms
SET table.exec.state.ttl=2592000s; --30 days,默认: 0 ms
--MiniBatch 聚合
SET table.exec.mini-batch.enabled = true;
SET table.exec.mini-batch.allow-latency = 1s;
SET table.exec.mini-batch.size = 10000;
--Local-Global 聚合
SET table.optimizer.agg-phase-strategy = TWO_PHASE;CREATE TABLE kafka_table (mid bigint,db string,sch string,tab string,opt string,ts bigint,ddl string,err string,src map<string,string>,cur map<string,string>,cus map<string,string>,account_id AS IF(cur['account_id'] IS NOT NULL , cur['account_id'], src ['account_id']),publish_time AS IF(cur['publish_time'] IS NOT NULL , cur['publish_time'], src ['publish_time']),msg_status AS IF(cur['msg_status'] IS NOT NULL , cur['msg_status'], src ['msg_status']),send_type AS IF(cur['send_type'] IS NOT NULL , cur['send_type'], src ['send_type'])--event_time as cast(IF(cur['update_time'] IS NOT NULL , cur['update_time'], src ['update_time']) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)--WATERMARK FOR event_time AS event_time - INTERVAL '1' MINUTE     --SECOND
) WITH ('connector' = 'kafka','topic' = 't1','properties.bootstrap.servers' = 'xx.xx.xx.xx:9092','properties.group.id' = 'g1','scan.startup.mode' = 'earliest-offset',  --group-offsets/earliest-offset/latest-offset--  'properties.enable.auto.commit',= 'true' -- default:false, 如果为false,则在发生checkpoint时触发offset提交'format' = 'json'
);CREATE TABLE es_sink(send_type      STRING,account_id     STRING,publish_time   STRING,grouping_id       INTEGER,init           INTEGER,init_cancel    INTEGER,push          INTEGER,succ           INTEGER,fail           INTEGER,init_delete    INTEGER,update_time    STRING,PRIMARY KEY (group_id,send_type,account_id,publish_time) NOT ENFORCED
)
with ('connector' = 'elasticsearch-6','index' = 'es_sink','document-type' = 'es_sink','hosts' = 'http://xxx:9200','format' = 'json','filter.null-value'='true','sink.bulk-flush.max-actions' = '1000','sink.bulk-flush.max-size' = '10mb'
);CREATE view  tmp as
selectsend_type,account_id,publish_time,msg_status,case when UPPER(opt) = 'INSERT' and msg_status='0'  then 1 else 0 end AS init,case when UPPER(opt) = 'UPDATE' and send_type='1' and msg_status='4' then 1 else 0 end AS init_cancel,case when UPPER(opt) = 'UPDATE' and msg_status='3' then 1 else 0 end AS push,case when UPPER(opt) = 'UPDATE' and (msg_status='1' or msg_status='5') then 1 else 0 end AS succ,case when UPPER(opt) = 'UPDATE' and (msg_status='2' or msg_status='6') then 1 else 0 end AS fail,case when UPPER(opt) = 'DELETE' and send_type='1' and msg_status='0' then  1 else 0 end AS init_delete,event_time,opt,ts
FROM kafka_table
where (UPPER(opt) = 'INSERT' and msg_status='0' )
or        (UPPER(opt) = 'UPDATE' and msg_status in ('1','2','3','4','5','6'))
or        (UPPER(opt) = 'DELETE' and send_type='1' and msg_status='0');--send_type=1          send_type=0
--初始化->0             初始化->0
--取消->4
--推送->3               推送->3
--成功->1               成功->5
--失败->2               失败->6CREATE view  tmp_groupby as
selectCOALESCE(send_type,'N') AS send_type
,COALESCE(account_id,'N') AS account_id
,COALESCE(publish_time,'N') AS publish_time
,case when send_type is null and account_id is null and publish_time is null then 1when send_type is not null and account_id is null and publish_time is null then 2when send_type is not null and account_id is not null and publish_time is null then 3when send_type is not null and account_id is not null and publish_time is not null then 4end grouping_id
,sum(init) as init
,sum(init_cancel) as init_cancel
,sum(push) as push
,sum(succ) as succ
,sum(fail) as fail
,sum(init_delete) as init_delete
from tmp
--GROUP BY GROUPING SETS ((send_type,account_id,publish_time), (send_type,account_id),(send_type), ())
GROUP BY ROLLUP (send_type,account_id,publish_time); --等同于以上INSERT INTO es_sink
selectsend_type,account_id,publish_time,grouping_id,init,init_cancel,push,succ,fail,init_delete,CAST(LOCALTIMESTAMP AS STRING) as update_time
from tmp_groupby

其他配置

  • flink集群参数
state.backend: rocksdb
state.backend.incremental: true
state.backend.rocksdb.ttl.compaction.filter.enabled: true
state.backend.rocksdb.localdir: /export/io_tmp_dirs/rocksdb
state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
state.savepoints.dir: hdfs://namenode-host:port/flink-savepoints
rest.flamegraph.enabled: true
pipeline.operator-chaining: false
taskmanager.memory.managed.fraction: 0.7
taskmanager.memory.network.min: 128 mb
taskmanager.memory.network.max: 128 mb
taskmanager.memory.framework.off-heap.size: 32mb
taskmanager.memory.task.off-heap.size: 32mb
taskmanager.memory.jvm-metaspace.size: 256mb
taskmanager.memory.jvm-overhead.fraction: 0.03
  • 检查点配置
    在这里插入图片描述

  • job运行资源
    管理节点(JM) 1 个, 节点规格 1 核 4 GB内存, 磁盘 10Gi
    运行节点(TM)10 个, 节点规格 1 核 4 GB内存, 磁盘 80Gi
    单TM槽位数(Slot): 1
    默认并行度:8

  • es mapping

#POST app_cust_syyy_private_domain_syyy_group_msg/app_cust_syyy_private_domain_syyy_group_msg/_mapping
{"app_cust_syyy_private_domain_syyy_group_msg": {"properties": {"send_type": {"type": "keyword","ignore_above": 256},"account_id": {"type": "keyword"},"publish_time": {"type": "keyword","fields": {"text": {"type": "keyword"},"date": {"type": "date","format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis","ignore_malformed":"true" # 忽略错误的各式}}},"grouping_id": {"type": "integer"},"init": {"type": "integer"},"init_cancel": {"type": "integer"},"query": {"type": "integer"},"succ": {"type": "integer"},"fail": {"type": "integer"},"init_delete": {"type": "integer"},"update_time": {"type": "date","format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"}}}
}

性能调优

是否开启【MiniBatch 聚合】和【Local-Global 聚合】对分组聚合场景影响巨大,尤其是在数据量大的场景下。

  • 如果未开启,在分组聚合,数据更新状态时,每条数据都会触发聚合运算,进而更新StateBackend (尤其是对于 RocksDB StateBackend,火焰图上反映就是一直在update rocksdb),造成上游算子背压特别大。此外,生产中非常常见的数据倾斜会使这个问题恶化,并且容易导致 job 发生反压。
    在这里插入图片描述

  • 在开启【MiniBatch 聚合】和【Local-Global 聚合】后,配置如下:

--MiniBatch 聚合
SET table.exec.mini-batch.enabled = true;
SET table.exec.mini-batch.allow-latency = 1s;
SET table.exec.mini-batch.size = 10000;
--Local-Global 聚合
SET table.optimizer.agg-phase-strategy = TWO_PHASE;

开启配置好会在DAG上添加两个环节MiniBatchAssignerLocalGroupAggregate
在这里插入图片描述

对结果的影响

开启了【MiniBatch 聚合】和【Local-Global 聚合】后,一天处理不完的数据,在10分钟内处理完毕

输出结果

在这里插入图片描述在这里插入图片描述

参考:
Group Aggregation
Streaming Aggregation Performance Tuning

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/416644.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

解决系统开发中的跨域问题:CORS、JSONP、Nginx

文章目录 一、概述1.问题场景2.浏览器的同源策略3.解决思路 二、一点准备工作1.创建前端工程12.创建后端工程3.创建前端工程24.跨域问题 三、方法1&#xff1a;使用CORS四、方法2&#xff1a;JSONP五、方法3&#xff1a;Nginx1.安装和启动&#xff08;windows&#xff09;2.使用…

TestNG注释

目录 TestNG注释列表 BeforeXXX和AfterXXX注释放在超类上时如何工作&#xff1f; 使用BeforeXXX和AfterXXX TestNG注释 TestNG是一个测试框架&#xff0c;旨在简化广泛的测试需求&#xff0c;从单元测试&#xff08;隔离测试一个类&#xff09;到集成测试&#xff08;测试由…

【LGR-172-Div.4】洛谷入门赛 #19(A—H,c++详解!)

文章目录 【LGR-172-Div.4】洛谷入门赛 #19A.分饼干 I题目描述输入格式输出格式样例 #1样例输入 #1样例输出 #1 样例 #2样例输入 #2样例输出 #2 提示样例解释 1样例解释 2数据范围与约定思路: 代码 B.分饼干 II题目描述输入格式输出格式样例 #1样例输入 #1样例输出 #1 样例 #2样…

书生·浦语大模型实战营第四次课堂笔记

先来看看参考作业 哈哈到这才想起来写笔记 倒回去看发现要求将不要葱姜蒜换成自己的名字和昵称&#xff01; 好好好我就是不配玩&#xff08;换成管理员也不行&#xff01;&#xff09; 诶怎么能进这个环境&#xff1f;要进双系统ubuntu&#xff1f; 现在看视频发现原来是…

HarmonyOS 页面跳转控制整个界面的转场动画

好 本文 我们来说 页面间的转场动画 就是 第一个界面到另一个界面 第一个界面的退场和第二个界面的进场效果 首先 我这里 创建了两个页面文件 Index.ets和AppView.ets index组件 编写代码如下 import router from "ohos.router" Entry Component struct Index {b…

鉴源实验室|自动驾驶仿真测试技术分析

01 引言 随着科技的不断发展&#xff0c;自动驾驶技术逐渐成为汽车行业的热门话题。然而&#xff0c;要将自动驾驶车辆投放到真实道路上之前&#xff0c;必须进行广泛的测试&#xff0c;以确保其在各种情况下都能安全可靠地运行。自动驾驶车辆的测试是一个复杂而昂贵的过程。…

大数据开发之Hadoop(完整版+练习)

第 1 章&#xff1a;Hadoop概述 1.1 Hadoop是什么 1、Hadoop是一个由Apache基金会所开发的分布式系统基础架构。 2、主要解决&#xff0c;海量数据的存储和海量数据的分析计算问题。 3、Hadoop通常是指一个更广泛的概念-Hadoop生态圈 1.2 Hadoop优势&#xff08;4高&#xf…

2024年AMC8被强制提前交卷事件应该告一段落了,向前看吧孩子们

原以为到今天开考前一切和往年一样的2024年AMC8竞赛也会和往年一样&#xff08;真绕&#xff09;&#xff0c;没想到出了个大规模强制提前交卷时间&#xff0c;让很多家长和孩子不可理解、很难受&#xff0c;甚至有一些家长在收集证据&#xff0c;准备请相关部门去调查&#xf…

DC-3靶机刷题记录

靶机下载地址&#xff1a; 链接&#xff1a;https://pan.baidu.com/s/1-P5ezyt5hUbmmGMP4EI7kw?pwdrt2c 提取码&#xff1a;rt2c 参考&#xff1a; http://t.csdnimg.cn/hhPi8https://www.vulnhub.com/entry/dc-32,312/ 官网http://t.csdnimg.cn/5mVZ7DC-3 (1).pdfhttps://…

[AutoSar]BSW_OS 05 Autosar OS_schedule table

目录 关键词平台说明一、调度表 关键词 嵌入式、C语言、autosar、OS、BSW 平台说明 项目ValueOSautosar OSautosar厂商vector &#xff0c;芯片厂商TI 英飞凌编程语言C&#xff0c;C编译器HighTec (GCC) >>>>>回到总目录<<<<<< 一、调度表…

ThinkPad T14/T15/P14s/P15s gen2电脑原厂Win10系统镜像 恢复笔记本出厂时预装自带OEM系统

lenovo联想原装出厂Windows10系统&#xff0c;适用型号&#xff1a; ThinkPad T14 Gen 2&#xff0c;ThinPad T15 Gen 2&#xff0c;ThinkPad P14s Gen 2&#xff0c;ThinkPad P15s Gen 2 &#xff08;20W1,20W5,20VY,20W7,20W0,20W4,20VX,20W6&#xff09; 链接&#xff1…

el-date-picker组件设置时间范围限制

需求&#xff1a; 如图所示&#xff0c;下图为新增的一个弹层页面&#xff0c;同时有个需求&#xff0c;日期选择需要限制一个月的时间范围&#xff08;一月默认为30天&#xff09;&#xff1a; 查看官方文档我们需要主要使用到如下表格的一些东西&#xff1a; 参数说明类型可…