FlinkSQL ChangeLog

01 Changelog相关优化规则

0101 运行upsert-kafka作业

登录sql-client,创建一个upsert-kafka的sql作业(注意,这里发送给kafka的消息必须带key,普通只有value的消息无法解析,这里的key即是主键的值)

CREATE TABLE pageviews_per_region (user_region STRING,pv STRING,PRIMARY KEY (user_region) NOT ENFORCED  -- 设置主键
) WITH ('connector' = 'upsert-kafka','topic' = 'pageviews_per_region','properties.bootstrap.servers' = 'xxxxxx:9092','key.format' = 'csv','value.format' = 'csv'
);select * from pageviews_per_region;

发送消息带key和消费消息显示key方式如下

kafka-console-producer.sh --broker-list xxxxxx:9092 --topic pageviews_per_region --property "parse.key=true" --property "key.separator=:"
key1:value1,value1
key2:value2,value2kafka-console-consumer.sh --bootstrap-server xxxxxx:9092 --topic pageviews_per_region --from-beginning --property print.key=true

作业的DAG图如下
在这里插入图片描述

0102 StreamPhysicalChangelogNormalize

DAG图中有一个ChangelogNormalize,代码中搜索到对应的类是StreamPhysicalChangelogNormalize,这是一个对changelog数据做规范化的类,注释如下

/*** Stream physical RelNode which normalizes a changelog stream which maybe an upsert stream or a* changelog stream containing duplicate events. This node normalize such stream into a regular* changelog stream that contains INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE records without* duplication.*/
class StreamPhysicalChangelogNormalize(

功能就是转成对应的exec节点

override def translateToExecNode(): ExecNode[_] = {val generateUpdateBefore = ChangelogPlanUtils.generateUpdateBefore(this)new StreamExecChangelogNormalize(unwrapTableConfig(this),uniqueKeys,generateUpdateBefore,InputProperty.DEFAULT,FlinkTypeFactory.toLogicalRowType(getRowType),getRelDetailedDescription)
}

0103 StreamPhysicalTableSourceScanRule

StreamPhysicalChangelogNormalize是在优化规则StreamPhysicalTableSourceScanRule当中创建的,如下流式的FlinkLogicalTableSourceScan会应用该规则

class StreamPhysicalTableSourceScanRuleextends ConverterRule(classOf[FlinkLogicalTableSourceScan],FlinkConventions.LOGICAL,FlinkConventions.STREAM_PHYSICAL,"StreamPhysicalTableSourceScanRule") {

创建StreamPhysicalChangelogNormalize,也就是转为changelog的条件如下

if (isUpsertSource(resolvedSchema, table.tableSource) ||isSourceChangeEventsDuplicate(resolvedSchema, table.tableSource, config)
) {

isUpsertSource判断是否为upsert流,判断逻辑如下

public static boolean isUpsertSource(ResolvedSchema resolvedSchema, DynamicTableSource tableSource) {if (!(tableSource instanceof ScanTableSource)) {return false;}ChangelogMode mode = ((ScanTableSource) tableSource).getChangelogMode();boolean isUpsertMode =mode.contains(RowKind.UPDATE_AFTER) && !mode.contains(RowKind.UPDATE_BEFORE);boolean hasPrimaryKey = resolvedSchema.getPrimaryKey().isPresent();return isUpsertMode && hasPrimaryKey;
}

其中ChangelogMode在各自数据源实现类的getChangelogMode接口中定义,如JDBC只支持insert

@Override
public ChangelogMode getChangelogMode() {return ChangelogMode.insertOnly();
}

isSourceChangeEventsDuplicate判断不是upsert的更改流,判断逻辑如下

public static boolean isSourceChangeEventsDuplicate(ResolvedSchema resolvedSchema,DynamicTableSource tableSource,TableConfig tableConfig) {if (!(tableSource instanceof ScanTableSource)) {return false;}ChangelogMode mode = ((ScanTableSource) tableSource).getChangelogMode();boolean isCDCSource =!mode.containsOnly(RowKind.INSERT) && !isUpsertSource(resolvedSchema, tableSource);boolean changeEventsDuplicate =tableConfig.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE);boolean hasPrimaryKey = resolvedSchema.getPrimaryKey().isPresent();return isCDCSource && changeEventsDuplicate && hasPrimaryKey;
}

综合来说要走StreamPhysicalChangelogNormalize这一条调用链,就不能是insertOnly的数据源,但目前大部分Flink实现的数据源包括Iceberg都是insertOnly的

0104 更新模式

Flink相关的更新模式类有如下几个:RowKind、ChangelogMode、UpdateKind

  • RowKind

RowKind是定义更新流每条数据的类型,其中对于更新有;两条数据,一条删除旧数据,一条插入新数据

/** Insertion operation. */
INSERT("+I", (byte) 0),/*** Update operation with the previous content of the updated row.** <p>This kind SHOULD occur together with {@link #UPDATE_AFTER} for modelling an update that* needs to retract the previous row first. It is useful in cases of a non-idempotent update,* i.e., an update of a row that is not uniquely identifiable by a key.*/
UPDATE_BEFORE("-U", (byte) 1),/*** Update operation with new content of the updated row.** <p>This kind CAN occur together with {@link #UPDATE_BEFORE} for modelling an update that* needs to retract the previous row first. OR it describes an idempotent update, i.e., an* update of a row that is uniquely identifiable by a key.*/
UPDATE_AFTER("+U", (byte) 2),/** Deletion operation. */
DELETE("-D", (byte) 3);
  • ChangelogMode

ChangelogMode定义数据源的更新模式,主要三种,就是包含不同的RowKind的类型

private static final ChangelogMode INSERT_ONLY =ChangelogMode.newBuilder().addContainedKind(RowKind.INSERT).build();private static final ChangelogMode UPSERT =ChangelogMode.newBuilder().addContainedKind(RowKind.INSERT).addContainedKind(RowKind.UPDATE_AFTER).addContainedKind(RowKind.DELETE).build();private static final ChangelogMode ALL =ChangelogMode.newBuilder().addContainedKind(RowKind.INSERT).addContainedKind(RowKind.UPDATE_BEFORE).addContainedKind(RowKind.UPDATE_AFTER).addContainedKind(RowKind.DELETE).build();
  • UpdateKind

UpdateKind是针对update这种更新类型细分

/** NONE doesn't represent any kind of update operation. */
NONE,/*** This kind indicates that operators should emit update changes just as a row of {@code* RowKind#UPDATE_AFTER}.*/
ONLY_UPDATE_AFTER,/*** This kind indicates that operators should emit update changes in the way that a row of {@code* RowKind#UPDATE_BEFORE} and a row of {@code RowKind#UPDATE_AFTER} together.*/
BEFORE_AND_AFTER

02 StreamExecChangelogNormalize

StreamExecChangelogNormalize的处理流程中根据是否启用table.exec.mini-batch.enabled分为微批处理和单数据的流处理

微批处理使用ProcTimeMiniBatchDeduplicateKeepLastRowFunction,流式使用ProcTimeDeduplicateKeepLastRowFunction,两者的核心差别就是微批会缓存数据使用一个for循环处理

这两个函数除了StreamPhysicalChangelogNormalize这一条链路外,还有StreamExecDeduplicate这一条链路,对应StreamPhysicalRankRule,是一个排序的东西

for (Map.Entry<RowData, RowData> entry : buffer.entrySet()) {RowData currentKey = entry.getKey();RowData currentRow = entry.getValue();ctx.setCurrentKey(currentKey);if (inputInsertOnly) {processLastRowOnProcTime(currentRow,generateUpdateBefore,generateInsert,state,out,isStateTtlEnabled,equaliser);} else {processLastRowOnChangelog(currentRow, generateUpdateBefore, state, out, isStateTtlEnabled, equaliser);}
}
  • processLastRowOnProcTime

对数据按照时间语义进行去重,将当前数据作为最新,这个函数只针对insert only的数据

static void checkInsertOnly(RowData currentRow) {Preconditions.checkArgument(currentRow.getRowKind() == RowKind.INSERT);
}

整套处理逻辑就是对数据根据场景修改数据的RowKind类型

} else {if (generateUpdateBefore) {preRow.setRowKind(RowKind.UPDATE_BEFORE);out.collect(preRow);}currentRow.setRowKind(RowKind.UPDATE_AFTER);out.collect(currentRow);
}
  • processLastRowOnChangelog

这个函数就是按Key去重,本质上也是针对数据修改RowKind

核心的一块功能就是更新的时候要将前一个数据修改为UPDATE_BEFORE

} else {if (generateUpdateBefore) {preRow.setRowKind(RowKind.UPDATE_BEFORE);out.collect(preRow);}currentRow.setRowKind(RowKind.UPDATE_AFTER);out.collect(currentRow);
}

函数整体借用的是Flink的state功能,从状态中获取前面的数据,所以对状态缓存由要求;另外针对非删除型的数据,如果TTL没有开的话,就不会更新前面的数据

if (!isStateTtlEnabled && equaliser.equals(preRow, currentRow)) {// currentRow is the same as preRow and state cleaning is not enabled.// We do not emit retraction and update message.// If state cleaning is enabled, we have to emit messages to prevent too early// state eviction of downstream operators.return;
}

03 初始RowKind来源

前面的流程里,在进行changelog转换的时候,数据是已经存在一个RowKind的值了,这一章追踪初始RowKind的来源

基于Flink-27的设计,Kafka数据源处理任务有一个KafkaRecordEmitter,emitRecord当中做数据的反序列化

deserializationSchema.deserialize(consumerRecord, sourceOutputWrapper);

最终走到DeserializationSchema.deserialize完成最终的反序列化

default void deserialize(byte[] message, Collector<T> out) throws IOException {T deserialize = deserialize(message);if (deserialize != null) {out.collect(deserialize);}
}

这里message是一个二进制数组,实际是Kafka的数据类型ConsumerRecord。根据SQL当中的配置,value反序列化使用的是csv,所以走到CsvRowDataDeserializationSchema当中处理

final JsonNode root = objectReader.readValue(message);
return (RowData) runtimeConverter.convert(root);

这里读出来的root是数据的key,convert的转化的实现类是CsvToRowDataConverters,其createRowConverter接口当中创建了转化函数,函数中将数据转化为了Flink的数据类型GenericRowData

GenericRowData row = new GenericRowData(arity);

GenericRowData的定义当中,有初始化RowKind,就是insert

public GenericRowData(int arity) {this.fields = new Object[arity];this.kind = RowKind.INSERT; // INSERT as default
}

04 Iceberg流式更新

使用方式

CREATE CATALOG hive_catalog WITH ('type'='iceberg','catalog-type'='hive','uri'='thrift://xxxx:19083','clientimecol'='5','property-version'='1','warehouse'='hdfs://nameservice/spark'  //是否HADOOP_CONF_DIR要export一下
);use CATALOG hive_catalog;CREATE TABLE test2(
id BIGINT COMMENT 'unique id',
data STRING,
primary key(id) not ENFORCED
);
ALTER TABLE test2 SET('format-version'='2');SET table.exec.iceberg.use-flip27-source = true;SELECT * FROM test2 /*+ OPTIONS('streaming'='true', 'monitor-interval'='10s')*/ ;

Iceberg流式更新目前只支持Append的数据,不支持更新删除

参考kafka,追踪IcebergSourceRecordEmitter,发现没有做数据转换,直接做了数据转发

public void emitRecord(RecordAndPosition<T> element, SourceOutput<T> output, IcebergSourceSplit split) {output.collect(element.record());split.updatePosition(element.fileOffset(), element.recordOffset());
}

数据格式的构建在更前面读数据的时候就完成了,读数据的核心逻辑在DataIterator

private void updateCurrentIterator() {try {while (!currentIterator.hasNext() && tasks.hasNext()) {currentIterator.close();currentIterator = openTaskIterator(tasks.next());fileOffset += 1;recordOffset = 0L;}} catch (IOException e) {throw new UncheckedIOException(e);}}
}

主要的功能类就是currentIterator,实现类为RowDataFileScanTaskReader,最终调用下一层iterator,下一层的实现类根据文件类型不同,parquet的实现类为ParquetReader,在next中读取数据

public T next() {if (valuesRead >= nextRowGroupStart) {advance();}if (reuseContainers) {this.last = model.read(last);} else {this.last = model.read(null);}valuesRead += 1;return last;
}

model实现类为ParquetValueReaders

public final T read(T reuse) {I intermediate = newStructData(reuse);for (int i = 0; i < readers.length; i += 1) {set(intermediate, i, readers[i].read(get(intermediate, i)));// setters[i].set(intermediate, i, get(intermediate, i));}return buildStruct(intermediate);
}

newStructData构建数据,创建了GenericRowData

protected GenericRowData newStructData(RowData reuse) {if (reuse instanceof GenericRowData) {return (GenericRowData) reuse;} else {return new GenericRowData(numFields);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/503591.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python3.x的在线与离线安装纯净版

由于计划搭建一套使用python自动分析日志的流程&#xff0c;发现我们的测试环境CentOS 7仍然没有安装python3&#xff0c;无法使用这些新的库。Python 3在设计上着重提升了语言的一致性和易用性&#xff0c;它引入了许多关键改进&#xff0c;此外&#xff0c;Python 3环境拥有丰…

机器学习 | 模型性能评估

目录 一. 回归模型的性能评估1. 平均平方误差(MSE)2. 平均绝对误差(MAE)3. R 2 R^{2} R2 值3.1 R 2 R^{2} R2优点 二. 分类模型的性能评估1. 准确率&#xff08;Accuracy&#xff09;2. 召回率&#xff08;Recall&#xff09;3. 精确率&#xff08;Precision&#xff09;4. …

sqlserver unique约束示例

UNIQUE 和 PRIMARY KEY 约束均为列或列集合提供了唯一性的保证。 PRIMARY KEY 拥有自动定义的 UNIQUE 约束。 与主键约束类似&#xff0c;唯一约束也强制唯一性&#xff0c;但唯一约束用于非主键的一列或者多列的组合&#xff0c;且一个表可以定义多个唯一约束。 有如下表&…

Linux运维-Web服务器的配置与管理(Apache+tomcat)(没成功,最后有失败经验)

Web服务器的配置与管理(Apachetomcat) 项目场景 公司业务经过长期发展&#xff0c;有了很大突破&#xff0c;已经实现盈利&#xff0c;现公司要求加强技术架构应用功能和安全性以及开始向企业应用、移动APP等领域延伸&#xff0c;此时原来开发web服务的php语言已经不适应新的…

02-prometheus监控-服务器节点监控node-exporter

一、概述 prometheus&#xff0c;本身是一个【数据收集】和【数据处理】的工具&#xff0c;如果效果要监控一台服务器物理机&#xff0c;有两种方式&#xff0c;一种是在物理机上部署“node-export”来收集数据上报给prometheus&#xff0c;另一种是“自定义监控”&#xff1b;…

PowerDesigner中怎么给ER图中字段设置默认值

双击table&#xff0c;进入数据库表详情页 详情页点击【Columns】 双击你要设置默认值得栏目&#xff0c;例如我得删除标记 点击【Standard Checks】&#xff0c;在【Defalut】中录入你想要得默认值&#xff0c;点击【应用即可】

Window10安装ruby

最好的方法&#xff0c;使用rubyinstaller&#xff0c;即在Downloads。 这是官方推荐的安装方式 通常来说我们会下载64位的 下载完后执行下载的exe即可。在最后一步会提示让安装gem&#xff0c;选则安装即可。 然后就可以在控制台进行测试了。

UniStorm - Volumetric Clouds, Sky, Modular Weather, and Cloud Shadows

UniStorm 是一款 AAA 动态天空、天气、云阴影和程序化体积云的终极解决方案。UniStorm 包含 100 多个可自定义的特色选项来使你的环境栩栩如生。甚至云的外观也可以自定义。聆听用户的反馈使 UniStorm 变得更加人性化以及简易使用。UniStorm 的功能非常强大,并且给你选项来调整…

FPGA开源项目分享——2D N-Body重力模拟器

​导语 今天继续康奈尔大学FPGA 课程ECE 5760的典型案例分享——2D N-Body重力模拟器。 &#xff08;更多其他案例请参考网站&#xff1a; Final Projects ECE 5760&#xff09; 1. 项目概述 项目网址 Grav Sim 项目说明 该项目的目标是创建一个用DE1-SOC进行硬件加速的2…

一周学会Django5 Python Web开发-Django5详细视图DetailView

锋哥原创的Python Web开发 Django5视频教程&#xff1a; 2024版 Django5 Python web开发 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili2024版 Django5 Python web开发 视频教程(无废话版) 玩命更新中~共计28条视频&#xff0c;包括&#xff1a;2024版 Django5 Python we…

centos7升级openssl_3

1、查看当前openssl版本 openssl version #一般都是1.几的版本2、下载openssl_3的包 wget --no-check-certificate https://www.openssl.org/source/old/3.0/openssl-3.0.3.tar.gz#解压 tar zxf openssl-3.0.3.tar.gz#进入指定的目录 cd openssl-3.0.33、编译安装遇到问题缺…

基于JAVA的聊天(ICQ)系统的设计于实现

一、绪论 ICQ是英文"I seek you "的简称&#xff0c;中文意思是我找你。ICQ最大的功能就是即时信息交流&#xff0c;只要记得对方的号码&#xff0c;上网时可以呼他&#xff0c;无论他在哪里&#xff0c;只要他上网打开ICQ&#xff0c;人们就可以随时交流。ICQ源于以…