基于数据湖的多流拼接方案-HUDI实操篇

目录

一、前情提要

二、代码Demo

(一)多写问题

(二)如果要两个流写一个表,这种情况怎么处理?

 (三)测试结果

三、后序


一、前情提要

基于数据湖对两条实时流进行拼接(如前端埋点+服务端埋点、日志流+订单流等);

基础概念见前一篇文章:基于数据湖的多流拼接方案-HUDI概念篇_Leonardo_KY的博客-CSDN博客

二、代码Demo

下文demo均使用datagen生成mock数据进行测试,如到生产,改成Kafka或者其他source即可。

第一个job:stream A,落hudi表:

        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(minPauseBetweenCP);  // 1senv.getCheckpointConfig().setCheckpointTimeout(checkpointTimeout);   // 2 minenv.getCheckpointConfig().setMaxConcurrentCheckpoints(maxConcurrentCheckpoints);// env.getCheckpointConfig().setCheckpointStorage("file:///D:/Users/yakang.lu/tmp/checkpoints/");TableEnvironment tableEnv = StreamTableEnvironment.create(env);// datagen====================================================================tableEnv.executeSql("CREATE TABLE sourceA (\n" +" uuid bigint PRIMARY KEY NOT ENFORCED,\n" +" `name` VARCHAR(3)," +" _ts1 TIMESTAMP(3)\n" +") WITH (\n" +" 'connector' = 'datagen', \n" +" 'fields.uuid.kind'='sequence',\n" +" 'fields.uuid.start'='0', \n" +" 'fields.uuid.end'='1000000', \n" +" 'rows-per-second' = '1' \n" +")");// hudi====================================================================tableEnv.executeSql("create table hudi_tableA(\n"+ " uuid bigint PRIMARY KEY NOT ENFORCED,\n"+ " age int,\n"+ " name VARCHAR(3),\n"+ " _ts1 TIMESTAMP(3),\n"+ " _ts2 TIMESTAMP(3),\n"+ " d VARCHAR(10)\n"+ ")\n"+ " PARTITIONED BY (d)\n"+ " with (\n"+ " 'connector' = 'hudi',\n"+ " 'path' = 'hdfs://ns/user/hive/warehouse/ctripdi_prodb.db/hudi_mor_mutil_source_test', \n"   // hdfs path+ " 'table.type' = 'MERGE_ON_READ',\n"+ " 'write.bucket_assign.tasks' = '10',\n"+ " 'write.tasks' = '10',\n"+ " 'write.partition.format' = 'yyyyMMddHH',\n"+ " 'write.partition.timestamp.type' = 'EPOCHMILLISECONDS',\n"+ " 'hoodie.bucket.index.num.buckets' = '2',\n"+ " 'changelog.enabled' = 'true',\n"+ " 'index.type' = 'BUCKET',\n"+ " 'hoodie.bucket.index.num.buckets' = '2',\n"+ String.format(" '%s' = '%s',\n", FlinkOptions.PRECOMBINE_FIELD.key(), "_ts1")+ " 'write.payload.class' = '" + PartialUpdateAvroPayload.class.getName() + "',\n"+ " 'hoodie.write.log.suffix' = 'job1',\n"+ " 'hoodie.write.concurrency.mode' = 'optimistic_concurrency_control',\n"+ " 'hoodie.write.lock.provider' = 'org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider',\n"+ " 'hoodie.cleaner.policy.failed.writes' = 'LAZY',\n"+ " 'hoodie.cleaner.policy' = 'KEEP_LATEST_BY_HOURS',\n"+ " 'hoodie.consistency.check.enabled' = 'false',\n"// + " 'hoodie.write.lock.early.conflict.detection.enable' = 'true',\n"   // todo// + " 'hoodie.write.lock.early.conflict.detection.strategy' = '"// + SimpleTransactionDirectMarkerBasedEarlyConflictDetectionStrategy.class.getName() + "',\n"  //+ " 'hoodie.keep.min.commits' = '1440',\n"+ " 'hoodie.keep.max.commits' = '2880',\n"+ " 'compaction.schedule.enabled'='false',\n"+ " 'compaction.async.enabled'='false',\n"+ " 'compaction.trigger.strategy'='num_or_time',\n"+ " 'compaction.delta_commits' ='3',\n"+ " 'compaction.delta_seconds' ='60',\n"+ " 'compaction.max_memory' = '3096',\n"+ " 'clean.async.enabled' ='false',\n"+ " 'hive_sync.enable' = 'false'\n"// + " 'hive_sync.mode' = 'hms',\n"// + " 'hive_sync.db' = '%s',\n"// + " 'hive_sync.table' = '%s',\n"// + " 'hive_sync.metastore.uris' = '%s'\n"+ ")");// sql====================================================================StatementSet statementSet = tableEnv.createStatementSet();String sqlString = "insert into hudi_tableA(uuid, name, _ts1, d) select * from " +"(select *,date_format(CURRENT_TIMESTAMP,'yyyyMMdd') AS d from sourceA) view1";statementSet.addInsertSql(sqlString);statementSet.execute();

第二个job:stream B,落hudi表:

        StreamExecutionEnvironment env = manager.getEnv();env.getCheckpointConfig().setMinPauseBetweenCheckpoints(minPauseBetweenCP);  // 1senv.getCheckpointConfig().setCheckpointTimeout(checkpointTimeout);   // 2 minenv.getCheckpointConfig().setMaxConcurrentCheckpoints(maxConcurrentCheckpoints);// env.getCheckpointConfig().setCheckpointStorage("file:///D:/Users/yakang.lu/tmp/checkpoints/");TableEnvironment tableEnv = StreamTableEnvironment.create(env);// datagen====================================================================tableEnv.executeSql("CREATE TABLE sourceB (\n" +" uuid bigint PRIMARY KEY NOT ENFORCED,\n" +" `age` int," +" _ts2 TIMESTAMP(3)\n" +") WITH (\n" +" 'connector' = 'datagen', \n" +" 'fields.uuid.kind'='sequence',\n" +" 'fields.uuid.start'='0', \n" +" 'fields.uuid.end'='1000000', \n" +" 'rows-per-second' = '1' \n" +")");// hudi====================================================================tableEnv.executeSql("create table hudi_tableB(\n"+ " uuid bigint PRIMARY KEY NOT ENFORCED,\n"+ " age int,\n"+ " name VARCHAR(3),\n"+ " _ts1 TIMESTAMP(3),\n"+ " _ts2 TIMESTAMP(3),\n"+ " d VARCHAR(10)\n"+ ")\n"+ " PARTITIONED BY (d)\n"+ " with (\n"+ " 'connector' = 'hudi',\n"+ " 'path' = 'hdfs://ns/user/hive/warehouse/ctripdi_prodb.db/hudi_mor_mutil_source_test', \n"   // hdfs path+ " 'table.type' = 'MERGE_ON_READ',\n"+ " 'write.bucket_assign.tasks' = '10',\n"+ " 'write.tasks' = '10',\n"+ " 'write.partition.format' = 'yyyyMMddHH',\n"+ " 'hoodie.bucket.index.num.buckets' = '2',\n"+ " 'changelog.enabled' = 'true',\n"+ " 'index.type' = 'BUCKET',\n"+ " 'hoodie.bucket.index.num.buckets' = '2',\n"+ String.format(" '%s' = '%s',\n", FlinkOptions.PRECOMBINE_FIELD.key(), "_ts1")+ " 'write.payload.class' = '" + PartialUpdateAvroPayload.class.getName() + "',\n"+ " 'hoodie.write.log.suffix' = 'job2',\n"+ " 'hoodie.write.concurrency.mode' = 'optimistic_concurrency_control',\n"+ " 'hoodie.write.lock.provider' = 'org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider',\n"+ " 'hoodie.cleaner.policy.failed.writes' = 'LAZY',\n"+ " 'hoodie.cleaner.policy' = 'KEEP_LATEST_BY_HOURS',\n"+ " 'hoodie.consistency.check.enabled' = 'false',\n"// + " 'hoodie.write.lock.early.conflict.detection.enable' = 'true',\n"   // todo// + " 'hoodie.write.lock.early.conflict.detection.strategy' = '"// + SimpleTransactionDirectMarkerBasedEarlyConflictDetectionStrategy.class.getName() + "',\n"+ " 'hoodie.keep.min.commits' = '1440',\n"+ " 'hoodie.keep.max.commits' = '2880',\n"+ " 'compaction.schedule.enabled'='true',\n"+ " 'compaction.async.enabled'='true',\n"+ " 'compaction.trigger.strategy'='num_or_time',\n"+ " 'compaction.delta_commits' ='2',\n"+ " 'compaction.delta_seconds' ='90',\n"+ " 'compaction.max_memory' = '3096',\n"+ " 'clean.async.enabled' ='false'\n"// + " 'hive_sync.mode' = 'hms',\n"// + " 'hive_sync.db' = '%s',\n"// + " 'hive_sync.table' = '%s',\n"// + " 'hive_sync.metastore.uris' = '%s'\n"+ ")");// sql====================================================================StatementSet statementSet = tableEnv.createStatementSet();String sqlString = "insert into hudi_tableB(uuid, age, _ts1, _ts2, d) select * from " +"(select *, _ts2 as ts1, date_format(CURRENT_TIMESTAMP,'yyyyMMdd') AS d from sourceB) view2";// statementSet.addInsertSql("insert into hudi_tableB(uuid, age, _ts2) select * from sourceB");statementSet.addInsertSql(sqlString);statementSet.execute();

也可以将两个 writer 放到同一个app中(使用statement):

import java.time.ZoneOffset;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.hudi.common.model.PartialUpdateAvroPayload;
import org.apache.hudi.configuration.FlinkOptions;
// import org.apache.hudi.table.marker.SimpleTransactionDirectMarkerBasedEarlyConflictDetectionStrategy;public class Test00 {public static void main(String[] args) {Configuration configuration = TableConfig.getDefault().getConfiguration();configuration.setString(TableConfigOptions.LOCAL_TIME_ZONE, ZoneOffset.ofHours(8).toString());//设置东八区// configuration.setInteger("rest.port", 8086);StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(configuration);env.setParallelism(1);env.enableCheckpointing(12000L);// env.getCheckpointConfig().setCheckpointStorage("file:///Users/laifei/tmp/checkpoints/");TableEnvironment tableEnv = StreamTableEnvironment.create(env);// datagen====================================================================tableEnv.executeSql("CREATE TABLE sourceA (\n" +" uuid bigint PRIMARY KEY NOT ENFORCED,\n" +" `name` VARCHAR(3)," +" _ts1 TIMESTAMP(3)\n" +") WITH (\n" +" 'connector' = 'datagen', \n" +" 'fields.uuid.kind'='sequence',\n" +" 'fields.uuid.start'='0', \n" +" 'fields.uuid.end'='1000000', \n" +" 'rows-per-second' = '1' \n" +")");tableEnv.executeSql("CREATE TABLE sourceB (\n" +" uuid bigint PRIMARY KEY NOT ENFORCED,\n" +" `age` int," +" _ts2 TIMESTAMP(3)\n" +") WITH (\n" +" 'connector' = 'datagen', \n" +" 'fields.uuid.kind'='sequence',\n" +" 'fields.uuid.start'='0', \n" +" 'fields.uuid.end'='1000000', \n" +" 'rows-per-second' = '1' \n" +")");// hudi====================================================================tableEnv.executeSql("create table hudi_tableA(\n"+ " uuid bigint PRIMARY KEY NOT ENFORCED,\n"+ " name VARCHAR(3),\n"+ " age int,\n"+ " _ts1 TIMESTAMP(3),\n"+ " _ts2 TIMESTAMP(3)\n"+ ")\n"+ " PARTITIONED BY (_ts1)\n"+ " with (\n"+ " 'connector' = 'hudi',\n"+ " 'path' = 'file:\\D:\\Ctrip\\dataWork\\tmp', \n"   // hdfs path+ " 'table.type' = 'MERGE_ON_READ',\n"+ " 'write.bucket_assign.tasks' = '2',\n"+ " 'write.tasks' = '2',\n"+ " 'write.partition.format' = 'yyyyMMddHH',\n"+ " 'write.partition.timestamp.type' = 'EPOCHMILLISECONDS',\n"+ " 'hoodie.bucket.index.num.buckets' = '2',\n"+ " 'changelog.enabled' = 'true',\n"+ " 'index.type' = 'BUCKET',\n"+ " 'hoodie.bucket.index.num.buckets' = '2',\n"// + String.format(" '%s' = '%s',\n", FlinkOptions.PRECOMBINE_FIELD.key(), "_ts1:name|_ts2:age")// + " 'write.payload.class' = '" + PartialUpdateAvroPayload.class.getName() + "',\n"+ " 'hoodie.write.log.suffix' = 'job1',\n"+ " 'hoodie.write.concurrency.mode' = 'optimistic_concurrency_control',\n"+ " 'hoodie.write.lock.provider' = 'org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider',\n"+ " 'hoodie.cleaner.policy.failed.writes' = 'LAZY',\n"+ " 'hoodie.cleaner.policy' = 'KEEP_LATEST_BY_HOURS',\n"+ " 'hoodie.consistency.check.enabled' = 'false',\n"+ " 'hoodie.write.lock.early.conflict.detection.enable' = 'true',\n"+ " 'hoodie.write.lock.early.conflict.detection.strategy' = '"// + SimpleTransactionDirectMarkerBasedEarlyConflictDetectionStrategy.class.getName() + "',\n"  //+ " 'hoodie.keep.min.commits' = '1440',\n"+ " 'hoodie.keep.max.commits' = '2880',\n"+ " 'compaction.schedule.enabled'='false',\n"+ " 'compaction.async.enabled'='false',\n"+ " 'compaction.trigger.strategy'='num_or_time',\n"+ " 'compaction.delta_commits' ='3',\n"+ " 'compaction.delta_seconds' ='60',\n"+ " 'compaction.max_memory' = '3096',\n"+ " 'clean.async.enabled' ='false',\n"+ " 'hive_sync.enable' = 'false'\n"// + " 'hive_sync.mode' = 'hms',\n"// + " 'hive_sync.db' = '%s',\n"// + " 'hive_sync.table' = '%s',\n"// + " 'hive_sync.metastore.uris' = '%s'\n"+ ")");tableEnv.executeSql("create table hudi_tableB(\n"+ " uuid bigint PRIMARY KEY NOT ENFORCED,\n"+ " name VARCHAR(3),\n"+ " age int,\n"+ " _ts1 TIMESTAMP(3),\n"+ " _ts2 TIMESTAMP(3)\n"+ ")\n"+ " PARTITIONED BY (_ts2)\n"+ " with (\n"+ " 'connector' = 'hudi',\n"+ " 'path' = '/Users/laifei/tmp/hudi/local.db/mutiwrite1', \n"   // hdfs path+ " 'table.type' = 'MERGE_ON_READ',\n"+ " 'write.bucket_assign.tasks' = '2',\n"+ " 'write.tasks' = '2',\n"+ " 'write.partition.format' = 'yyyyMMddHH',\n"+ " 'hoodie.bucket.index.num.buckets' = '2',\n"+ " 'changelog.enabled' = 'true',\n"+ " 'index.type' = 'BUCKET',\n"+ " 'hoodie.bucket.index.num.buckets' = '2',\n"// + String.format(" '%s' = '%s',\n", FlinkOptions.PRECOMBINE_FIELD.key(), "_ts1:name|_ts2:age")// + " 'write.payload.class' = '" + PartialUpdateAvroPayload.class.getName() + "',\n"+ " 'hoodie.write.log.suffix' = 'job2',\n"+ " 'hoodie.write.concurrency.mode' = 'optimistic_concurrency_control',\n"+ " 'hoodie.write.lock.provider' = 'org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider',\n"+ " 'hoodie.cleaner.policy.failed.writes' = 'LAZY',\n"+ " 'hoodie.cleaner.policy' = 'KEEP_LATEST_BY_HOURS',\n"+ " 'hoodie.consistency.check.enabled' = 'false',\n"+ " 'hoodie.write.lock.early.conflict.detection.enable' = 'true',\n"+ " 'hoodie.write.lock.early.conflict.detection.strategy' = '"// + SimpleTransactionDirectMarkerBasedEarlyConflictDetectionStrategy.class.getName() + "',\n"+ " 'hoodie.keep.min.commits' = '1440',\n"+ " 'hoodie.keep.max.commits' = '2880',\n"+ " 'compaction.schedule.enabled'='true',\n"+ " 'compaction.async.enabled'='true',\n"+ " 'compaction.trigger.strategy'='num_or_time',\n"+ " 'compaction.delta_commits' ='2',\n"+ " 'compaction.delta_seconds' ='90',\n"+ " 'compaction.max_memory' = '3096',\n"+ " 'clean.async.enabled' ='false'\n"// + " 'hive_sync.mode' = 'hms',\n"// + " 'hive_sync.db' = '%s',\n"// + " 'hive_sync.table' = '%s',\n"// + " 'hive_sync.metastore.uris' = '%s'\n"+ ")");// sql====================================================================StatementSet statementSet = tableEnv.createStatementSet();statementSet.addInsertSql("insert into hudi_tableA(uuid, name, _ts1) select * from sourceA");statementSet.addInsertSql("insert into hudi_tableB(uuid, age, _ts2) select * from sourceB");statementSet.execute();}
}

(一)多写问题

由于HUDI官方提供的code打成jar包是不支持“多写”的,这里使用Tencent改造之后的code进行打包测试;

如果使用官方包,多个writer写入同一个hudi表,则会报如下异常:

而且:

hudi中有个preCombineField,在建表的时候只能指定其中一个字段为preCombineField,但是如果使用官方版本,双流写同一个hudi的时候出现两种情况:

1. 一条流写preCombineField,另一条流不写这个字段,后者会出现 ordering value不能为null;

2. 两条流都写这个字段,出现字段冲突异常;

(二)如果要两个流写一个表,这种情况怎么处理?

经过本地测试:

hudi0.12-multiWrite版本(Tencent修改版),可以支持多 precombineField(在此版本中,只要保证主键、分区字段之外的字段,在多个流中不冲突即可实现多写!)

hudi0.13版本,不支持,而且存在上述问题; 

 (三)测试结果

0

Tencent文章链接:https://cloud.tencent.com/developer/article/2189914

github链接:GitHub - XuQianJin-Stars/hudi at multiwrite-master-7

hudi打包很麻烦,如果需要我将后续上传打好的jar包;

三、后序

基于上述code,当流量比较大的时候,似乎会存在一定程度的数据丢失(在其中一条流进行compact,则另一条流就会存在一定程度的数据丢失);

可以尝试:

(1)先将两个流UNION为一个流,再sink到hudi表(也避免了写冲突);

(2)使用其他数据湖工具,比如apache paimon,参考:新一代数据湖存储技术Apache Paimon入门Demo_Leonardo_KY的博客-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/87033.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

优化时间流:区间调度问题的探索与解决

在浩如烟海的信息时代,时间的有效管理成为了一门不可或缺的艺术。无论是生活中的琐事,还是工作中的任务,时间都在无声地流逝,挑战着我们的智慧。正如时间在日常生活中具有的宝贵价值一样,在计算机科学领域,…

大数据Flink实时计算技术

1、架构 2、应用场景 Flink 功能强大,支持开发和运行多种不同种类的应用程序。它的主要特性包括:批流一体化、精密的状态管理、事件时间支持以及精确一次的状态一致性保障等。在启用高可用选项的情况下,它不存在单点失效问题。事实证明&#…

IDEA创建Spring,Maven项目没有resources文件夹

有时新建Spring或Maven项目时,会出现目录中main下无resources文件夹的情况,来一起解决一下: FIles|Project Structure 在Modules模块找到对应路径,在main下创建resources,右键main,选择新文件夹 输入文件…

uniapp国际化npm install vue-i18n报错

npm install vue-i18n //npmyarn add vue-i18n //yarn在vue2环境下,默认安装 npm install vue-i18n 的版本是 vue-i18n9.1.9,所以报错。 npm view vue-i18n versions --json 用以上命令查看版本: vue2建议5.0版本 npm install vue-i1…

生态经济学领域里的R语言机器学(数据的收集与清洗、综合建模评价、数据的分析与可视化、数据的空间效应、因果推断等)

近年来,人工智能领域已经取得突破性进展,对经济社会各个领域都产生了重大影响,结合了统计学、数据科学和计算机科学的机器学习是人工智能的主流方向之一,目前也在飞快的融入计量经济学研究。表面上机器学习通常使用大数据&#xf…

深度学习-4-二维目标检测-YOLOv3理论模型

单阶段目标检测模型YOLOv3 R-CNN系列算法需要先产生候选区域,再对候选区域做分类和位置坐标的预测,这类算法被称为两阶段目标检测算法。近几年,很多研究人员相继提出一系列单阶段的检测算法,只需要一个网络即可同时产生候选区域并…

CTF-XXE(持续更新,欢迎分享更多相关知识点的题目)

知识 实例 BUU [PHP]XXE 进来看到 然后一起看 Write BUU XXE COURSE 1 进来看到 一起看 write NSS [NCTF2019]Fake XML cookbook 反正是XXE 直接整 write [NCTF 2019]True XML cookbook 不整花里胡哨,解题在最下面 write 与博主不同,我通过…

ES 7.6 - APi基础操作篇

ES7.6-APi基础操作篇 前言相关知识索引相关创建索引查询索引查询所有索引删除索引关闭与打开索引关闭索引打开索引 冻结与解冻索引冻结索引解冻索引 映射相关创建映射查看映射新增字段映射 文档相关(CURD)新增文档根据ID查询修改文档全量覆盖根据ID选择性修改根据条件批量更新 …

激活函数总结(二十二):激活函数补充(Soft Exponential、ParametricLinear)

激活函数总结(二十二):激活函数补充 1 引言2 激活函数2.1 Soft Exponential激活函数2.2 ParametricLinear激活函数 3. 总结 1 引言 在前面的文章中已经介绍了介绍了一系列激活函数 (Sigmoid、Tanh、ReLU、Leaky ReLU、PReLU、Swish、ELU、SE…

短视频矩阵源码saas开发搭建

一、 短视频矩阵系统源码开发部署步骤分享 确定开发环境:务必准备好项目的开发环境,包括操作系统、IDE、数据库和服务器等。 下载源码:从官方网站或者Github等平台下载短视频矩阵系统源码,并进行解压。 配置数据库:根…

vue3+ts+uniapp小程序端自定义日期选择器基于内置组件picker-view + 扩展组件 Popup 实现自定义日期选择及其他选择

vue3ts 基于内置组件picker-view 扩展组件 Popup 实现自定义日期选择及其他选择 vue3tsuniapp小程序端自定义日期选择器 1.先上效果图2.代码展示2.1 组件2.2 公共方法处理日期2.3 使用组件 3.注意事项3.1refSelectDialog3.1 backgroundColor"#fff" 圆角问题 自我记…

【Java架构-版本控制】-Git进阶

本文摘要 Git作为版本控制工具,使用非常广泛,在此咱们由浅入深,分三篇文章(Git基础、Git进阶、Gitlab搭那家)来深入学习Git 文章目录 本文摘要1. Git分支管理2. Git分支本质2.1 分支流转流程(只新增文件)2.2 分支流转流…