Flink CDC 的 debezium-json 格式和 debezium 原生格式是一回事吗?

《大数据平台架构与原型实现:数据中台建设实战》博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧二维码进入京东手机购书页面。

这是一个很容易混淆和误解的问题,值得拿出来讨论对比一下。我们知道 Debezium 是专门用于捕获 CDC 数据的开源框架,它对接了多种数据库,同时也定义了自己的 CDC 数据交换格式,也就是常说的 debezium 格式。而Flink CDC 复用了 Debezium 的部分功能,也就是说:Debezium 是 Flink CDC 的底层采集工具,Flink CDC 的工程依赖会用使用到 Debezium 的 Jar 包,然后 Flink CDC 在 Debezium 基础之上,封装了额外的功能,例如:无锁读取,并发读取(全量数据的读取性能可以水平扩展),断点续传,这些功能是 Debezium 所不具备的,也是 Flink CDC 存在的意义。

同时,Flink 还有一种专门的数据格式 debezium-json,从名称上看,它似乎就是 debezium 格式的 json 表达形式,那 debezium-json 格式和 debezium 原生格式是一回事吗?

首先,我们要主要到这样一个细节:mysql-cdc 作为一个 source connector,并不要求指定 format,实际上,它的 format 是不可配置的,因为 Flink CDC 在内部实现依赖 debezium,获得的原始的数据格式就是 debezium 格式,对外,这不可配置,也不可见,只有向下游传导数据时,才会涉及到解析和转换的问题。

其次,我们还要先澄清一种误解:debezium-json 并不是跟 Flink CDC(例如mysql-cdc)绑定在一起的,作为一种独立的、可描述 changelog 的格式,实际上,它可以应用到任何动态表上,例如:如果上游表是:connector=upsert-kafka,format=json,下游依旧可以使用: connector=kafka,format=debezium-json,关于这一点,可以参考本文的实测 《Flink SQL:debezium-json 格式的表一定是数据库的 CDC 数据吗?》,这个测试给出了这样一个非常明确的结论:

使用 debezium-json 格式的表不一定是数据库的 CDC 数据,但一定是上游动态表的 changelog,然后使用 debezium-json 格式描述。

Flink CDC 从数据库 binlog 中提取数据时使用了 debezium,获得的原始的数据格式也是 debezium 格式,但是,这都是发生在 Flink CDC 内部的,对外是不可见的!当需要把 CDC 数据传给下游时,才会针对下游指定的格式进行转换,这种转换也是根据目标表 DDL 中定义的 Schema 自动地隐式地完成的。

我们还是靠举例和试验来说明这个问题吧。再次看一下 《Flink CDC 与 Kafka 集成:Snapshot 还是 Changelog?Upsert Kafka 还是 Kafka?》 一文的 ”测试组合(1):connector=kafka,format=debezium-json“ 一节给出的案例。

原生 Debezium 格式(样例)

使用如下 SQL 创建一个 mysql-cdc 的源表:

SET 'sql-client.execution.result-mode' = 'TABLEAU';DROP TABLE IF EXISTS orders_mysql_cdc;CREATE TABLE IF NOT EXISTS orders_mysql_cdc (`order_number` INT NOT NULL,`order_date` DATE NOT NULL,`purchaser` INT NOT NULL,`quantity` INT NOT NULL,`product_id` INT NOT NULL,CONSTRAINT `PRIMARY` PRIMARY KEY (`order_number`) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc','hostname' = '10.0.13.30','port' = '3307','username' = 'root','password' = 'Admin1234!','database-name' = 'inventory','table-name' = 'orders'
);

那从 Flink CDC 源表提取出来的数据应该是什么样子呢?前面我们已经说过,这个动作发生在 Flink CDC 内部,提取的数据也是外部不可见的,那我们能不能从其他渠道确定实际的数据格式吗?能,如果说 Flink CDC 就是通过 Debezium 来采集数据,那么采集到的最原始的数据格式就是标准的 Debezium 格式,通常,这是这个样子的:

{"before": null,"after": {"osci.mysql-server-3.inventory.orders.Value": {"order_number": 10006,"order_date": 16852,"purchaser": 1003,"quantity": 1,"product_id": 107}},"source": {"version": "2.2.0.Final","connector": "mysql","name": "osci.mysql-server-3","ts_ms": 1705645511000,"snapshot": {"string": "false"},"db": "inventory","sequence": null,"table": {"string": "orders"},"server_id": 223344,"gtid": null,"file": "mysql-bin.000004","pos": 640,"row": 0,"thread": {"long": 10},"query": null},"op": "c","ts_ms": {"long": 1705645511455},"transaction": null
}

再次强调,上述格式的数据在 Flink CDC 中是不可见的,发生于 Flink CDC 内部,以上格式是标准的 debezium 数据格式,Flink CDC一定是率先拿到了这种格式的数据然后再经处理转发给下游的,比如:如果 DDL 中提取了某些元数据,也是从上面这种原始的 Debezium 数据中获取的。

debezium-json 格式(样例)

如下的 SQL 在 Kafka 上创建了一个 debezium-json 格式的目标表,然后使用 INSERT INTO ... SELECT ... 把源表和目标表的数据流驱动起来:

DROP TABLE IF EXISTS orders_kafka_debezium_json;CREATE TABLE IF NOT EXISTS orders_kafka_debezium_json (order_number int,order_date   date,purchaser    int,quantity     int,product_id   int
) WITH ('connector' = 'kafka','topic' = 'orders_kafka_debezium_json','properties.bootstrap.servers' = 'b-2.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092,b-3.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092,b-1.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092','properties.group.id' = 'orders_kafka_debezium_json','scan.startup.mode' = 'earliest-offset','format' = 'debezium-json'
);-- 提交持续查询,驱动整个 Pipelineinsert into orders_kafka_debezium_json select * from orders_mysql_cdc;

这时,写入 Kafka 中的 debezium-json 格式的数据是这样的:

{"before": {"order_number": 10003,"order_date": "2016-02-19","purchaser": 1002,"quantity": 2,"product_id": 106},"after": null,"op": "d"
}

结论

比较上述两种消息格式就能看出:

debezium-json 格式并不等于原生的 debezium 格式,两者有很多相似之处,都有 before,after,op,原生 debezium 格式仅发生并存在于 Flink CDC 内部,对外不可见,debezium-json 格式可用于表达任何动态表的 changelog,与数据库 CDC 数据已无必然的绑定关系。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/625069.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

通过meavn引用jar包

方法一&#xff1a;引用jar包&#xff08;常用&#xff09; 创建一个lib包&#xff0c;将jar包导入lib包中 配置pom文件 <dependency><groupId>com.by</groupId><artifactId>test-jar</artifactId><version>1.0-SNAPSHOT</version>…

基于JSP的电器网上订购系统

本系统利用现在比较广泛的JSP结合后台SpringMybatisAjax编写程序的方式实现的。 在意见箱板块中&#xff0c;运用JSP通过JDBC技术和后台的数据库进行交互的方式将数据信息反馈给用户和管理员&#xff1b;在登录系统中&#xff0c;使用Ajax技术实现异步交互&#xff0c;在不更新…

[AI]windows部署Ollama

1、下载&&安装Ollama 下载地址&#xff1a;Download Ollama on Windows 1&#xff09;下载完成后直接点击exe文件进行安装即可&#xff0c;安装程序不能选择安装目录 2&#xff09;完成后执行cmd命令&#xff0c;输入ollama命令&#xff0c;如下即表示成功 2、配置模…

Flutter 插件站新升级: 加入优秀 GitHub 开源项目

Flutter 插件站新升级: 加入优秀 GitHub 开源项目 视频 https://youtu.be/qa49W6FaDGs https://www.bilibili.com/video/BV1L1421o7fV/ 前言 原文 https://ducafecat.com/blog/flutter-awesome-github-repo-download 这几天晚上抽空把 Flutter 插件站升级&#xff0c;现在支…

Rust面试宝典第4题:打家劫舍

题目 你是一个专业的小偷&#xff0c;计划偷窃沿街的房屋。每间房内都藏有一定的现金&#xff0c;影响你偷窃的唯一制约因素就是相邻的房屋装有相互连通的防盗系统。如果两间相邻的房屋在同一晚上被小偷闯入&#xff0c;系统会自动报警。 给定一个代表每个房屋存放金额的非负整…

文心一言用户数突破2亿 百度官宣三大AI开发神器

在日益激烈的竞争中&#xff0c;百度正在中国AI市场努力保持领导者地位&#xff0c;文心一言用户规模突破2亿&#xff0c;较去年年底翻了一番。 4月16日周二&#xff0c;以“创造未来”为主题的Create 2024百度AI开发者大会在深圳国际会展中心举办。百度CEO李彦宏在会议上指出…

026——项目管理与由来

目录 作者有话说 项目的管理方式 develop分支管理 作者有话说 已经出了25期的文章了&#xff0c;一直没说过我在做个什么。相信大家也有这个以后&#xff0c;作者写了几M的代码到现在不会只是为了点个灯吧。要是这我几十行代码就能解决。 这是一个小故事&#xff0c;老粉丝都…

P9241 [蓝桥杯 2023 省 B] 飞机降落

原题链接&#xff1a;[蓝桥杯 2023 省 B] 飞机降落 - 洛谷 目录 1. 题目描述 2. 思路分析 3. 代码实现 1. 题目描述 2. 思路分析 dfs全排列的变形题。 因为最后问飞机是否降落&#xff0c;并且一架飞机降落完毕时另一架飞机才能降落。所以我们设置dfs的两个变量cnt为安全…

【一竞技CS2】VP战队官宣签下electroNic取代mir

1、近日VP战队官宣签下electroNic&#xff0c;以取代阵容中的mir。 electroNic自己也表示&#xff1a;“VP是一支顶级队伍。阵容核心曾赢得Major冠军&#xff0c;所有队员都处于巅峰状态并且时刻准备着去争夺冠军。我们有着一样的雄心壮志。 此外我还对和Jame很感兴趣&#xf…

原始部落版本潮玩宇宙小程序定制大逃杀游戏APP开发H5游戏

原始部落版本潮玩宇宙小程序定制大逃杀游戏APP开发H5游戏 潮玩宇宙小程序定制大逃杀游戏APP开发H5游戏 潮玩宇宙大逃杀小游戏模块成品源码&#xff0c;可嵌入任何平台系统&#xff0c;增加用户粘性&#xff0c;消除泡沫&#xff0c;短视频直播引流。 玩家选择一间房间躲避杀手…

论文笔记:(INTHE)WILDCHAT:570K CHATGPT INTERACTION LOGS IN THE WILD

iclr 2024 spotlight reviewer 评分 5668 1 intro 由大型语言模型驱动的对话代理&#xff08;ChatGPT&#xff0c;Claude 2&#xff0c;Bard&#xff0c;Bing Chat&#xff09; 他们的开发流程通常包括三个主要阶段 预训练语言模型在被称为“指令调优”数据集上进行微调&…

竞赛 基于GRU的 电影评论情感分析 - python 深度学习 情感分类

文章目录 1 前言1.1 项目介绍 2 情感分类介绍3 数据集4 实现4.1 数据预处理4.2 构建网络4.3 训练模型4.4 模型评估4.5 模型预测 5 最后 1 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 基于GRU的 电影评论情感分析 该项目较为新颖&#xff0c;适合作为竞…