5 Paimon数据湖之表数据查询详解

更多Paimon数据湖内容请关注:https://edu.51cto.com/course/35051.html

虽然前面我们已经讲过如何查询Paimon表中的数据了,但是有一些细节的东西还需要详细分析一下。

  • 首先是针对Paimon中系统表的查询,例如snapshots\schemas\options等等这些系统表。
    其实简单理解就是我们可以通过sql的形式查询系统表来查看实体表的快照、schema等信息,这些信息我们也可以直接到hdfs中查看,只是不太方便。

  • 在查询数据的时候,可以细分为批量读取和流式读取,因为Paimon可以同时支持批处理和流处理。

  • 在查询数据的时候,如果想要从之前的某一个时间点开始查询数据,也就说任务启动的时候想要查询一些历史数据,则需要用到时间旅行这个特性,可以在SQL查询语句中通过动态表选项指定scan.mode参数来控制具体查询哪些历史数据。

Scan Mode的值可以有多种,不同的值代表不同的含义,下面我们来具体分析一下:
在这里插入图片描述

注意:在分析的时候,我们需要针对批处理和流处理这两种情况分别进行分析。

  • (1)default:如果我们在执行查询的时候,没有指定scan.mode参数,则默认是default。但是此时需要注意,如果我们也没有同时指定其他参数,例如:timestamp-millis\snapshot-id等scan相关的参数,那么默认会执行latest-full策略。
    所以说,我们在执行查询的时候,如果没有指定任何scan相关的参数,那么默认执行的策略就是latest-full。

  • (2)latest-full:和full是一样的效果,不过full这个参数已经被标记为过期了。针对批处理,表示只读取最新快照中的所有数据,读取完成以后任务就执行结束了。针对流处理,表示第一次启动时读取最新快照中的所有数据,然后继续读取后续新增的变更数据,这个任务会一直运行。

  • (3)latest:针对批处理,他的执行效果和latest-full是一样的,只会读取最新快照中的所有数据。但是针对流处理就不一样了,此时表示只读取最新的变更数据,也就是说任务启动之后,只读取新增的数据,之前的历史快照中的数据不读取。类似于kafka中消费者里面的latest消费策略。

  • (4)from-snapshot:使用此策略的时候,需要同时指定snapshot-id参数。针对批处理,表示只读取指定id的快照中的所有数据。针对流处理,表示从指定id的快照开始读取变更数据(注意:此时不是读取这个快照中的所有数据,而是读取此快照中的变更数据,也可以理解为这个快照和上一个快照相比新增的数据),当然,后续新增的变更数据也是可以读取到的,因为这个是流处理,他会一直执行读取操作。

  • (5)from-snapshot-full:使用此策略的时候,也需要同时指定snapshot-id参数。针对批处理,他的执行效果和from-snapshot是一样的。针对流处理,表示第一次启动时读取指定id的快照中的所有数据,然后继续读取后续新增的变更数据,此时任务会一直执行。

  • (6)from-timestamp:使用此策略的时候,需要同时指定timestamp-millis参数。针对批处理,表示只读取指定时间戳的快照中的所有数据。针对流处理,表示从指定时间戳的快照开始读取变更数据,(注意,这里也是读取这个快照中的变更数据,不是所有数据。),然后读取后续新增的变更数据。

  • (7)incremental:表示是增量查询,这个主要是针对批处理的,通过这种策略可以读取开始和结束快照之间的增量变化。开始和结束快照可以通过快照id或者是时间戳进行指定。
    如果是使用快照id,则需要通过incremental-between参数指定。
    如果是使用时间戳,则需要通过incremental-between-timestamp参数指定。

  • (8)compacted-full:想要使用这个参数有一个前提,Paimon表需要开启完全压缩(full compaction)。此时针对批处理,表示只读取最新完全压缩(full compaction)的快照中的所有数据。针对流处理,表示第一次启动时读取最新完全压缩(full compaction)的快照中的所有数据,然后继续读取后续新增的变更数据。

针对这里面的latest、latest-full、compacted-full这几种策略放在一起可能容易混淆,下面我们来通过一个图重新梳理一下:
在这里插入图片描述

首先看中间这条线,表示是数据的时间轴,左边是历史数据,右边是最新产生的数据。

中间这条线上面是批处理,下面是流处理。

我们首先来看批处理:
如果我们指定了scan.modelatest-full或者是latest,则会读取最新的快照中的所有数据,也就是Last Snapshot中的数据。
如果我们指定了scan.modecompacted-full,则会读取最新的完全压缩(full compaction)的快照中的数据,也就是Last Compact Snapshot中的数据。

接下来看一下流处理:
如果我们指定了scan.modelatest-full,则会在任务第一次启动时读取最新快照中的所有数据,然后继续读取后续新增的变更数据。也就是第一次启动时先读取Last Snapshot中的所有数据,接着读取后续新产生的数据。
如果我们指定了scan.modelatest,则此时只读取最新的变更数据,不读取LastSnapshot快照中的数据。
如果我们指定了scan.modecompacted-full,则第一次启动时会读取最新完全压缩(full compaction)的快照中的所有数据,也就是Last Compact Snapshot中的数据,接着读取后续新产生的数据。

这就是这些策略在批处理和流处理中的执行流程。

(1)查询系统表

下面我们来通过具体的案例来演示一下前面提到的查询数据相关的用法。

首先创建一个向Paimon表中模拟写入数据的类,便于一会测试使用
创建package:tech.xuwei.paimon.query

创建object:FlinkSQLWriteToPaimon

代码如下:

package tech.xuwei.paimon.queryimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 通过FlinkSQL 向Paimon中模拟写入数据* Created by xuwei*/
object FlinkSQLWriteToPaimon {def main(args: Array[String]): Unit = {//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setRuntimeMode(RuntimeExecutionMode.STREAMING)val tEnv = StreamTableEnvironment.create(env)//创建Paimon类型的CatalogtEnv.executeSql("""|CREATE CATALOG paimon_catalog WITH (|    'type'='paimon',|    'warehouse'='hdfs://bigdata01:9000/paimon'|)|""".stripMargin)tEnv.executeSql("USE CATALOG paimon_catalog")//创建Paimon类型的表tEnv.executeSql("""|CREATE TABLE IF NOT EXISTS `query_table`(|    name STRING,|    age INT,|    PRIMARY KEY (name) NOT ENFORCED|)|""".stripMargin)//写入数据tEnv.executeSql("INSERT INTO query_table(name,age) VALUES('jack',18)")tEnv.executeSql("INSERT INTO query_table(name,age) VALUES('tom',19)")tEnv.executeSql("INSERT INTO query_table(name,age) VALUES('mick',20)")}}

在idea中运行这个代码。

接下来创建一个类来查询一下Paimon中的系统表。

创建object:FlinkPaimonSystemTable

代码如下:

package tech.xuwei.paimon.queryimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 查询Paimon中的系统表* Created by xuwei*/
object FlinkPaimonSystemTable {def main(args: Array[String]): Unit = {//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setRuntimeMode(RuntimeExecutionMode.BATCH)//使用批处理模式val tEnv = StreamTableEnvironment.create(env)//创建Paimon类型的CatalogtEnv.executeSql("""|CREATE CATALOG paimon_catalog WITH (|    'type'='paimon',|    'warehouse'='hdfs://bigdata01:9000/paimon'|)|""".stripMargin)tEnv.executeSql("USE CATALOG paimon_catalog")//snapshot信息表,对应的其实就是hdfs中表的snapshot目录下的snapshot-*文件信息println("====================snapshot信息表===========================")tEnv.executeSql("SELECT * FROM query_table$snapshots").print()//schema信息表,对应的其实就是hdfs中表的schema目录下的schema-*文件信息println("====================schema信息表===========================")tEnv.executeSql("SELECT * FROM query_table$schemas").print()//manifest信息表,对应的其实就是hdfs中表的manifest目录下的manifest-*文件信息println("====================manifest信息表===========================")tEnv.executeSql("SELECT * FROM query_table$manifests").print()//file信息表,对应的其实就是hdfs中表的bucket-*目录下的data-*文件信息println("====================file信息表===========================")tEnv.executeSql("SELECT * FROM query_table$files").print()//option信息表,对应的就是建表语句中with里面指定的参数信息,在表的schema-*文件中也能看到option信息println("====================option信息表===========================")tEnv.executeSql("SELECT * FROM query_table$options").print()//consumer信息表,在查询数据的sql语句中指定了consumer-id之后才能看到println("====================consumer信息表===========================")tEnv.executeSql("SELECT * FROM query_table$consumers").print()//audit log信息表,相当于是表的审核日志,可以看到表中每条数据的rowkind,也就是+I\-U\+U\-Dprintln("====================audit log信息表===========================")tEnv.executeSql("SELECT * FROM query_table$audit_log").print()}
}

运行代码。
注意:在本地执行flink sql中的print,会看到下面错误:

java.lang.IllegalStateException: MiniCluster is not yet running or has already been shut down.at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)at org.apache.flink.runtime.minicluster.MiniCluster.getDispatcherGatewayFuture(MiniCluster.java:1026)at org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:899)at org.apache.flink.runtime.minicluster.MiniCluster.getJobStatus(MiniCluster.java:823)at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobStatus(MiniClusterJobClient.java:91)at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.isJobTerminated(CollectResultFetcher.java:210)at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:118)at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)at org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:219)at org.apache.flink.table.utils.print.TableauStyle.print(TableauStyle.java:120)at org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:153)at tech.xuwei.paimon.query.FlinkPaimonSystemTable$.main(FlinkPaimonSystemTable.scala:35)at tech.xuwei.paimon.query.FlinkPaimonSystemTable.main(FlinkPaimonSystemTable.scala)

这个异常不影响程序执行,实际工作中我们不会写这种代码,一般都是在sql中写insert into select语句了,在这主要是为了方便测试,忽略这个异常即可。

如果感觉看起来比较乱,可以修改一下log4j.properties日志中的告警级别,改为error级别即可。

log4j.rootLogger=error,stdoutlog4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n

重新运行代码,可以看到如下结果:

====================snapshot信息表===========================
+----+----------------------+----------------------+--------------------------------+----------------------+--------------------------------+-------------------------+----------------------+----------------------+------------------------+----------------------+
| op |          snapshot_id |            schema_id |                    commit_user |    commit_identifier |                    commit_kind |             commit_time |   total_record_count |   delta_record_count | changelog_record_count |            watermark |
+----+----------------------+----------------------+--------------------------------+----------------------+--------------------------------+-------------------------+----------------------+----------------------+------------------------+----------------------+
| +I |                    1 |                    0 | 8f74d97b-bf6b-4ac7-bb47-3bb... |  9223372036854775807 |                         APPEND | 2023-07-28 17:35:22.859 |                    1 |                    1 |                      0 | -9223372036854775808 |
| +I |                    2 |                    0 | 49412497-1749-4566-8bf8-1c5... |  9223372036854775807 |                         APPEND | 2023-07-28 17:35:24.802 |                    2 |                    1 |                      0 | -9223372036854775808 |
| +I |                    3 |                    0 | e55e756d-e528-4b7c-97f0-a01... |  9223372036854775807 |                         APPEND | 2023-07-28 17:35:26.409 |                    3 |                    1 |                      0 | -9223372036854775808 |
+----+----------------------+----------------------+--------------------------------+----------------------+--------------------------------+-------------------------+----------------------+----------------------+------------------------+----------------------+
3 rows in set
====================schema信息表===========================
+----+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| op |            schema_id |                         fields |                 partition_keys |                   primary_keys |                        options |                        comment |
+----+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| +I |                    0 | [{"id":0,"name":"name","typ... |                             [] |                       ["name"] |                             {} |                                |
+----+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
1 row in set
====================manifest信息表===========================
+----+--------------------------------+----------------------+----------------------+----------------------+----------------------+
| op |                      file_name |            file_size |      num_added_files |    num_deleted_files |            schema_id |
+----+--------------------------------+----------------------+----------------------+----------------------+----------------------+
| +I | manifest-800ac729-22d3-494b... |                 1665 |                    1 |                    0 |                    0 |
| +I | manifest-61d14e4e-d2a0-42ac... |                 1675 |                    1 |                    0 |                    0 |
| +I | manifest-fd8e45b0-d456-467a... |                 1673 |                    1 |                    0 |                    0 |
+----+--------------------------------+----------------------+----------------------+----------------------+----------------------+
3 rows in set
====================file信息表===========================
+----+--------------------------------+-------------+--------------------------------+--------------------------------+----------------------+-------------+----------------------+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+-------------------------+
| op |                      partition |      bucket |                      file_path |                    file_format |            schema_id |       level |         record_count |   file_size_in_bytes |                        min_key |                        max_key |              null_value_counts |                min_value_stats |                max_value_stats |           creation_time |
+----+--------------------------------+-------------+--------------------------------+--------------------------------+----------------------+-------------+----------------------+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+-------------------------+
| +I |                             [] |           0 | data-6b23bcaf-3dbe-46c0-a67... |                            orc |                    0 |           0 |                    1 |                  566 |                         [jack] |                         [jack] |                {age=0, name=0} |            {age=18, name=jack} |            {age=18, name=jack} | 2023-07-28 17:35:22.453 |
| +I |                             [] |           0 | data-ce40f0df-aa2a-4682-8b6... |                            orc |                    0 |           0 |                    1 |                  581 |                         [mick] |                         [mick] |                {age=0, name=0} |            {age=20, name=mick} |            {age=20, name=mick} | 2023-07-28 17:35:26.257 |
| +I |                             [] |           0 | data-ac9bd895-2b8e-4efe-969... |                            orc |                    0 |           0 |                    1 |                  572 |                          [tom] |                          [tom] |                {age=0, name=0} |             {age=19, name=tom} |             {age=19, name=tom} | 2023-07-28 17:35:24.603 |
+----+--------------------------------+-------------+--------------------------------+--------------------------------+----------------------+-------------+----------------------+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+-------------------------+
3 rows in set
====================option信息表===========================
Empty set
====================tag信息表===========================
Empty set
====================consumer信息表===========================
Empty set
====================audit log信息表===========================
+----+--------------------------------+--------------------------------+-------------+
| op |                        rowkind |                           name |         age |
+----+--------------------------------+--------------------------------+-------------+
| +I |                             +I |                           jack |          18 |
| +I |                             +I |                           mick |          20 |
| +I |                             +I |                            tom |          19 |
+----+--------------------------------+--------------------------------+-------------+
3 rows in set
(2)批量读取

下面演示一下如何在批量读取中使用时间旅行功能。

创建object:tech.xuwei.paimon.query.FlinkPaimonBatchQuery

代码如下:

package tech.xuwei.paimon.queryimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 批量读取* Created by xuwei*/
object FlinkPaimonBatchQuery {def main(args: Array[String]): Unit = {//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//SET 'execution.runtime-mode' = 'batch';env.setRuntimeMode(RuntimeExecutionMode.BATCH)//使用批处理模式val tEnv = StreamTableEnvironment.create(env)//创建Paimon类型的CatalogtEnv.executeSql("""|CREATE CATALOG paimon_catalog WITH (|    'type'='paimon',|    'warehouse'='hdfs://bigdata01:9000/paimon'|)|""".stripMargin)tEnv.executeSql("USE CATALOG paimon_catalog")//批量查询数据tEnv.executeSql("""|SELECT * FROM query_table|-- /*+ OPTIONS('scan.mode'='latest-full') */ -- 默认策略,可以省略不写,只读取最新快照中的所有数据|-- /*+ OPTIONS('scan.mode'='latest') */ -- 在批处理模式下和latest-full的效果一致|-- /*+ OPTIONS('scan.mode'='from-snapshot','scan.snapshot-id' = '2') */ -- 只读取指定id的快照中的所有数据|-- /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '2') */ -- 在批处理模式下和from-snapshot的效果一致|-- /*+ OPTIONS('scan.mode'='from-timestamp','scan.timestamp-millis' = '1690536924802') */ -- 只读取指定时间戳的快照中的所有数据|-- /*+ OPTIONS('scan.mode'='incremental','incremental-between' = '1,3') */ -- 指定两个快照id,查询这两个快照之间的增量变化|-- /*+ OPTIONS('scan.mode'='incremental','incremental-between-timestamp' = '1690536922859,1690536926409') */ -- 指定两个时间戳,查询这两个快照之间的增量变化|""".stripMargin).print()}
}

运行代码,查看每一种策略的数据结果。

注意:在演示compacted-full这种策略的时候需要给表开启full-compaction

所以重新创建一个新的表。

创建object:FlinkSQLWriteToPaimonForCompact

代码如下:

package tech.xuwei.paimon.queryimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 通过FlinkSQL 向Paimon中模拟写入数据* Created by xuwei*/
object FlinkSQLWriteToPaimonForCompact {def main(args: Array[String]): Unit = {//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setRuntimeMode(RuntimeExecutionMode.STREAMING)val tEnv = StreamTableEnvironment.create(env)//创建Paimon类型的CatalogtEnv.executeSql("""|CREATE CATALOG paimon_catalog WITH (|    'type'='paimon',|    'warehouse'='hdfs://bigdata01:9000/paimon'|)|""".stripMargin)tEnv.executeSql("USE CATALOG paimon_catalog")//创建Paimon类型的表tEnv.executeSql("""|CREATE TABLE IF NOT EXISTS `query_table_compact`(|    name STRING,|    age INT,|    PRIMARY KEY (name) NOT ENFORCED|)WITH(|    'changelog-producer' = 'full-compaction',|    'full-compaction.delta-commits' = '1'|)|""".stripMargin)//写入数据tEnv.executeSql("INSERT INTO query_table_compact(name,age) VALUES('jack',18)")tEnv.executeSql("INSERT INTO query_table_compact(name,age) VALUES('tom',19)")tEnv.executeSql("INSERT INTO query_table_compact(name,age) VALUES('mick',20)")}}

运行代码。

再创建一个新的读取数据的类:

创建object:FlinkPaimonBatchQueryForCompact

代码如下:

package tech.xuwei.paimon.queryimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 批量读取* Created by xuwei*/
object FlinkPaimonBatchQueryForCompact {def main(args: Array[String]): Unit = {//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//SET 'execution.runtime-mode' = 'batch';env.setRuntimeMode(RuntimeExecutionMode.BATCH)//使用批处理模式val tEnv = StreamTableEnvironment.create(env)//创建Paimon类型的CatalogtEnv.executeSql("""|CREATE CATALOG paimon_catalog WITH (|    'type'='paimon',|    'warehouse'='hdfs://bigdata01:9000/paimon'|)|""".stripMargin)tEnv.executeSql("USE CATALOG paimon_catalog")//批量查询数据tEnv.executeSql("""|SELECT * FROM query_table_compact|/*+ OPTIONS('scan.mode' = 'compacted-full') */ --表需要开启full-compaction,设置changelog-producer和full-compaction.delta-commits|""".stripMargin).print()}
}

运行代码,可以看到如下结果:

+----+--------------------------------+-------------+
| op |                           name |         age |
+----+--------------------------------+-------------+
| +I |                           jack |          18 |
| +I |                           mick |          20 |
| +I |                            tom |          19 |
+----+--------------------------------+-------------+

由于目前每一次提交数据都会触发完全压缩,所以我们查询最新的完全压缩快照中的数据是可以获取到所有数据的。

此时可以通过系统表查看一下这个表的snapshot信息:
创建object:FlinkPaimonSystemTableForCompact

代码如下:

package tech.xuwei.paimon.queryimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 查询Paimon中的系统表* Created by xuwei*/
object FlinkPaimonSystemTableForCompact {def main(args: Array[String]): Unit = {//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setRuntimeMode(RuntimeExecutionMode.BATCH)//使用批处理模式val tEnv = StreamTableEnvironment.create(env)//创建Paimon类型的CatalogtEnv.executeSql("""|CREATE CATALOG paimon_catalog WITH (|    'type'='paimon',|    'warehouse'='hdfs://bigdata01:9000/paimon'|)|""".stripMargin)tEnv.executeSql("USE CATALOG paimon_catalog")//snapshot信息表,对应的其实就是hdfs中表的snapshot目录下的snapshot-*文件信息println("====================snapshot信息表===========================")tEnv.executeSql("SELECT * FROM query_table_compact$snapshots").print()}
}

执行代码,可以看到如下结果

====================snapshot信息表===========================
+----+----------------------+----------------------+--------------------------------+----------------------+--------------------------------+-------------------------+----------------------+----------------------+------------------------+----------------------+
| op |          snapshot_id |            schema_id |                    commit_user |    commit_identifier |                    commit_kind |             commit_time |   total_record_count |   delta_record_count | changelog_record_count |            watermark |
+----+----------------------+----------------------+--------------------------------+----------------------+--------------------------------+-------------------------+----------------------+----------------------+------------------------+----------------------+
| +I |                    1 |                    0 | 38d8f3a4-aeb3-4072-90cf-421... |  9223372036854775807 |                         APPEND | 2023-07-28 17:57:07.293 |                    1 |                    1 |                      0 | -9223372036854775808 |
| +I |                    2 |                    0 | 38d8f3a4-aeb3-4072-90cf-421... |  9223372036854775807 |                        COMPACT | 2023-07-28 17:57:08.211 |                    3 |                    2 |                      1 | -9223372036854775808 |
| +I |                    3 |                    0 | 84203720-0e42-40a6-8202-642... |  9223372036854775807 |                         APPEND | 2023-07-28 17:57:09.423 |                    4 |                    1 |                      0 | -9223372036854775808 |
| +I |                    4 |                    0 | 84203720-0e42-40a6-8202-642... |  9223372036854775807 |                        COMPACT | 2023-07-28 17:57:09.641 |                    8 |                    4 |                      1 | -9223372036854775808 |
| +I |                    5 |                    0 | 25d0f600-076a-407f-a07a-caf... |  9223372036854775807 |                         APPEND | 2023-07-28 17:57:11.500 |                    9 |                    1 |                      0 | -9223372036854775808 |
| +I |                    6 |                    0 | 25d0f600-076a-407f-a07a-caf... |  9223372036854775807 |                        COMPACT | 2023-07-28 17:57:12.130 |                   15 |                    6 |                      1 | -9223372036854775808 |
+----+----------------------+----------------------+--------------------------------+----------------------+--------------------------------+-------------------------+----------------------+----------------------+------------------------+----------------------+

此时可以看到在commit_kind这一列中显示的有APPENDCOMPACT,表示这个快照是追加产生的还是完全压缩产生的。

由于我们配置的每一次提交数据都会触发完全压缩,所以对应的有3个完全压缩产生的快照。

为了便于验证,我们可以把最新的那个完全压缩的快照删除掉,再执行查询,看看结果是什么样的:

删除最新的完全压缩的快照:

[root@bigdata04 ~]# hdfs dfs -rm -r /paimon/default.db/query_table_compact/snapshot/snapshot-6

注意:这个删除操作建议大家在命令行执行,不要在web页面执行,在web页面删除可能会直接把这个表的目录删除掉!!!!!

然后再执行FlinkPaimonBatchQueryForCompact,结果如下:

+----+--------------------------------+-------------+
| op |                           name |         age |
+----+--------------------------------+-------------+
| +I |                           jack |          18 |
| +I |                            tom |          19 |
+----+--------------------------------+-------------+
2 rows in set

注意:此时最新的完全压缩的快照就是snapshot-4了,这个快照中只有2条数据。

这就是批量读取中时间旅行参数的使用。

(3)流式读取

下面演示一下如何在流式读取中使用时间旅行功能。

创建object:FlinkPaimonStreamingQuery

代码如下:

package tech.xuwei.paimon.queryimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 流式读取* Created by xuwei*/
object FlinkPaimonStreamingQuery {def main(args: Array[String]): Unit = {//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//SET 'execution.runtime-mode' = 'streaming';env.setRuntimeMode(RuntimeExecutionMode.STREAMING)//使用流处理模式val tEnv = StreamTableEnvironment.create(env)//创建Paimon类型的CatalogtEnv.executeSql("""|CREATE CATALOG paimon_catalog WITH (|    'type'='paimon',|    'warehouse'='hdfs://bigdata01:9000/paimon'|)|""".stripMargin)tEnv.executeSql("USE CATALOG paimon_catalog")//流式查询数据tEnv.executeSql("""|SELECT * FROM query_table|-- /*+ OPTIONS('scan.mode'='latest-full') */ -- 默认策略,可以省略不写,第一次启动时读取最新快照中的所有数据,然后继续读取后续新增的变更数据|-- /*+ OPTIONS('scan.mode'='latest') */ -- 只读取最新的变更数据|-- /*+ OPTIONS('scan.mode'='from-snapshot','scan.snapshot-id' = '2') */ -- 从指定id的快照开始读取变更数据(包含后续新增)|-- /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '2') */ -- 第一次启动时读取指定id的快照中的所有数据,然后继续读取后续新增的变更数据|-- /*+ OPTIONS('scan.mode'='from-timestamp','scan.timestamp-millis' = '1690536924802') */ -- 从指定时间戳的快照开始读取变更数据(包含后续新增)|""".stripMargin).print()}
}
(4)Consumer ID

最后我们在流式读取这里扩展一个知识点:Consumer ID,这个功能是针对流式读取设计的。

相当于我们在kafka消费者中指定一个groupid,这样可以通过groupid维护消费数据的偏移量信息,便于任务停止以后重启的时候继续基于之前的进度进行查询。

在这里Consumer ID的主要作用是为了方便记录每次查询到的数据快照的位置,他会把下一个还未读取的快照id记录到hdfs文件中。
当之前的任务停止以后,新启动的任务可以基于之前任务记录的快照id继续查询数据,不需要从状态中恢复位置信息。

这个特性目前属于实验特性,还没有经过大量生产环境的验证,大家可以先提前了解一下。

下面来结合一个案例演示一下:
具体的思路是这样的:

  • 1:首先使用Consumer ID查询一次query_table表中的数据。
  • 2:然后停止之前的查询任务,向query_table表中模拟产生1条数据。
  • 3:重新启动第1步骤中的任务,验证一下是否只读取到了新增的那1条数据

创建object:FlinkPaimonStreamingQueryForConsumerid

代码如下:

package tech.xuwei.paimon.queryimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 流式读取* Created by xuwei*/
object FlinkPaimonStreamingQueryForConsumerid {def main(args: Array[String]): Unit = {//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//SET 'execution.runtime-mode' = 'streaming';env.setRuntimeMode(RuntimeExecutionMode.STREAMING)//使用流处理模式val tEnv = StreamTableEnvironment.create(env)//注意:在流处理模式中,操作Paimon表时需要开启Checkpoint。env.enableCheckpointing(5000)//创建Paimon类型的CatalogtEnv.executeSql("""|CREATE CATALOG paimon_catalog WITH (|    'type'='paimon',|    'warehouse'='hdfs://bigdata01:9000/paimon'|)|""".stripMargin)tEnv.executeSql("USE CATALOG paimon_catalog")//流式查询数据tEnv.executeSql("""|SELECT * FROM query_table|/*+ OPTIONS('consumer-id'='con-1') */ -- 指定消费者id|""".stripMargin).print()}
}

注意:在这需要开启checkpoint,否则Consumer ID的功能无法正常触发。

第一次执行此代码,可以看到如下结果:

+----+--------------------------------+-------------+
| op |                           name |         age |
+----+--------------------------------+-------------+
| +I |                           jack |          18 |
| +I |                           mick |          20 |
| +I |                            tom |          19 |

停止此代码。

此时其实可以到hdfs中查看一下维护的Consumer ID信息:

[root@bigdata04 ~]# hdfs dfs -cat /paimon/default.db/query_table/consumer/consumer-con-1
{"nextSnapshot" : 4
}

这里面记录的是下一次需要读取的快照id,数值为4,此时最新的快照id是3,因为快照id为3的快照已经读取过了,下一个快照id就是4了。

其实直接查询consumer系统表也是可以看到这些信息的。

创建object:FlinkPaimonSystemTableForConsumerid

代码如下:

package tech.xuwei.paimon.queryimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 查询Paimon中的系统表* Created by xuwei*/
object FlinkPaimonSystemTableForConsumerid {def main(args: Array[String]): Unit = {//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setRuntimeMode(RuntimeExecutionMode.BATCH)//使用批处理模式val tEnv = StreamTableEnvironment.create(env)//创建Paimon类型的CatalogtEnv.executeSql("""|CREATE CATALOG paimon_catalog WITH (|    'type'='paimon',|    'warehouse'='hdfs://bigdata01:9000/paimon'|)|""".stripMargin)tEnv.executeSql("USE CATALOG paimon_catalog")//consumer信息表,在查询数据的sql语句中指定了consumer-id之后才能看到println("====================consumer信息表===========================")tEnv.executeSql("SELECT * FROM query_table$consumers").print()}
}

执行代码,可以看到如下结果:

====================consumer信息表===========================
+----+--------------------------------+----------------------+
| op |                    consumer_id |     next_snapshot_id |
+----+--------------------------------+----------------------+
| +I |                          con-1 |                    4 |
+----+--------------------------------+----------------------+
1 row in set

从这可以看出来,next_snapshot_id4,查出来的结果是一样的。

接下来我们向query_table中新增一条数据。

创建object:FlinkSQLWriteToPaimonForConsumerid

代码如下:

package tech.xuwei.paimon.queryimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 通过FlinkSQL 向Paimon中模拟写入数据* Created by xuwei*/
object FlinkSQLWriteToPaimonForConsumerid {def main(args: Array[String]): Unit = {//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setRuntimeMode(RuntimeExecutionMode.STREAMING)val tEnv = StreamTableEnvironment.create(env)//创建Paimon类型的CatalogtEnv.executeSql("""|CREATE CATALOG paimon_catalog WITH (|    'type'='paimon',|    'warehouse'='hdfs://bigdata01:9000/paimon'|)|""".stripMargin)tEnv.executeSql("USE CATALOG paimon_catalog")//创建Paimon类型的表tEnv.executeSql("""|CREATE TABLE IF NOT EXISTS `query_table`(|    name STRING,|    age INT,|    PRIMARY KEY (name) NOT ENFORCED|)|""".stripMargin)//写入数据tEnv.executeSql("INSERT INTO query_table(name,age) VALUES('jessic',30)")}}

执行代码。

最后,我们再重新启动FlinkPaimonStreamingQueryForConsumerid,可以看到如下结果:

+----+--------------------------------+-------------+
| op |                           name |         age |
+----+--------------------------------+-------------+
| +I |                         jessic |          30 |

能看到这个结果,说明这个consumer id生效了,当我们第二次使用相同的consumer id读取这个表的时候,是可以基于之前的进度继续读取的。

停止此任务。

此时再执行FlinkPaimonSystemTableForConsumerid,查看最新的next_snapshot_id

====================consumer信息表===========================
+----+--------------------------------+----------------------+
| op |                    consumer_id |     next_snapshot_id |
+----+--------------------------------+----------------------+
| +I |                          con-1 |                    5 |
+----+--------------------------------+----------------------+
1 row in set

此时next_snapshot_id变成了5,这是正确的。

更多Paimon数据湖内容请关注:https://edu.51cto.com/course/35051.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/171223.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

接口开放太麻烦?试试阿里云API网关吧

前言 我在多方合作时,系统间的交互是怎么做的?这篇文章中写过一些多方合作时接口的调用规则和例子,然而,接口开放所涉及的安全、权限、监控、流量控制等问题,可不是简简单单就可以解决的,这一般需要专业的…

ElasticSearch简单操作

目录 1.单机部署 1.1 解压软件 1.2 创建软链接 1.3 修改配置文件 1.4 配置环境变量 1.5 后台启动 2.配置分词器 2.1 安装IK分词器 2.2 ES 扩展词汇 3.常用操作 3.1 索引 3.1.1 创建索引 3.1.2 查看所有索引 3.1.3 查看单个索引 3.1.4 删除索引 3.2.文档 3.2.1…

@Async注解的坑

问题描述 一个方法调用另一个方法(该方法使用Async注解)在同一个类文件中,该注解会失效! 问题复现 TestAsyncController 类 import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Async; im…

双十一大促已过,虾皮、Lazada年底如何通过测评补单打造搜索排名

双十一大促已过,有人欢喜有人忧,不管怎么样,年底的这波旺季还是要好好把握的。 如何提升虾皮搜索排名 1、标题关键词匹配度 Shopee、Lazada的排名规则主要是根据用户搜索时输入的关键字和卖家的商品标题、描述等是否相匹配来进行排名&…

P6入门:项目初始化7-项目详情之代码/分类码Code

前言 使用项目详细信息查看和编辑有关所选项目的详细信息,在项目创建完成后,初始化项目是一项非常重要的工作,涉及需要设置的内容包括项目名,ID,责任人,日历,预算,资金,分类码等等&…

qnx log 系统

前言 本文主要介绍QNX 系统中的 log 打印相关接口和使用方法 软件环境:qnx7.1 一、QNX查看 log 的工具 slog2info 1. slog2info 的相关介绍 和linux 中查看 kernel log 信息的 dmesg 命令一样, qnx 里面也有一个查看 log 信息的命令,那就是 slog2info 命令, 如下图所示是…

Spark SQL 每年的1月1日算当年的第一个自然周, 给出日期,计算是本年的第几周

一、问题 按每年的1月1日算当年的第一个自然周 (遇到跨年也不管,如果1月1日是周三,那么到1月5号(周日)算是本年的第一个自然周, 如果按周一是一周的第一天) 计算是本年的第几周,那么 spark sql 如何写 ? 二、分析 …

0基础学习PyFlink——水位线(watermark)触发计算

在《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》和《0基础学习PyFlink——个数滑动窗口(Sliding Count Windows)》中,我们发现如果窗口中元素个数没有把窗口填满,则不会触发计算。 为了解决长期不计算的问题&a…

Arthas(阿尔萨斯)--(二)

目录 一、Arthas学习 1、JVM相关命令一 1、dashboard 2、thread 3、jvm 4、sysprop 一、Arthas学习 Arthas(阿尔萨斯)--(一) Arthas代码开源地址 1、JVM相关命令一 1、dashboard dashboard:显示当前系统的实时数据面板,按q或ctrlc退出 ID: Java 级别的线…

Docker - 容器数据卷

Docker - 容器数据卷 什么是容器数据卷 等同于挂载,将容器内的目录地址指向于宿主机文件系统中 直接使用命令来挂载 -v docker run -it -v 主机目录:容器内目录# 测试 docker run -it -v /root:/home centos /bin/bash [rootiZ2zeg7mctvft5renx1qvbZ ~]# docker …

vscode 快速打印console.log

第一步 输入这些 {// Print Selected Variabl 为自定义快捷键中需要使用的name,可以自行修改"Print Selected Variable": {"body": ["\nconsole.log("," %c $CLIPBOARD: ,"," background-color: #3756d4; padding:…

Linux---(六)自动化构建工具 make/Makefile

文章目录 一、make/Makefile二、快速查看(1)建立Makefile文件(2)编辑Makefile文件(3)解释(4)效果展示 三、背后的基本知识、原理(1)如何清理对应的临时文件呢…