CloudCanal x Debezium 打造实时数据流动新范式

简述

Debezium 是一个开源的数据订阅工具,主要功能为捕获数据库变更事件发送到 Kafka。

CloudCanal 近期实现了从 Kafka 消费 Debezium 格式数据,将其 同步到 StarRocks、Doris、Elasticsearch、MongoDB、ClickHouse 等 12 种数据库和数仓,补全其数据到达能力。

本文将先简单介绍该项技术实现的背景,再通过 MySQL -> Kafka -> Starrocks 示例展示此功能。

image.png

为什么要消费 Debezium 格式数据

高流行度

Debezium 是一个高质量、被大量项目集成的开源项目,社区用户活跃,官方维护积极,修复 bug、增加新特性,不断更新版本。

作为 Kafka Connect 生态系统的一部分,Debezium 能够无缝与 Kafka 进行对接,为用户后端数据处理提供了强大的 实时数据准备 能力。

由此形成的高流行度,让每一个数据行业从业者不能忽视其影响力。

合理的消息结构

Schema(数据结构) 遵循 Kafka Connect 标准,提供了详细的字段信息。

"schema": {"type": "struct","fields": [{"type": "int32", "optional": false, "field": "id"},{"type": "string", "optional": false, "field": "name"},{"type": "int32", "optional": false, "field": "age"}],"optional": false, "name": "my_database.user.Value"
}

Payload(数据)包含实际的数据库变更数据,与 Schema 中定义的字段对应。

"payload": {"id": 123,"name": "John Doe","age": 30,"source": {...}
}

此外消息还携带了源端数据源全面的关联信息,包括库、表、时间戳、位点等信息。整体格式实用、简洁。

支持 Schema 演进

Debezium 不仅捕获数据库模式的当前状态,还能感知和记录每次模式变更细节。

当数据库表结构发生变化时(如添加、删除、修改字段等),Debezium 能够 实时捕获这些结构变更,确保变更事件的精准传递。

另外 Debezium 会为每个捕获的变更事件 记录包含当前和先前 Schema 的历史记录

这意味着 可追溯任何时刻数据库 Schema,了解特定时间点表字段、数据类型等信息, 并且可精准还原数据库在某一时刻的结构,无需额外的查询或推测。

CDC 数据格式标准

Debezium 数据 Schema 基于 Kafka Connect 标准设计,这使 Debezium 产生的变更事件能够轻松地集成到各种 Kafka Connect 连接器中,实现了与 Kafka 生态系统的顺畅对接。

这个设计使得 Debezium 数据 Schema 有望成为 CDC(Change Data Capture) 领域标准,为实时数据流的流动提供了基础设施。

端到端的缺憾

Debezium 集如此众多的优点,但是其官方缺少消息到对端的能力(目前有在补充),这让一部分用户感觉束手无策,CloudCanal 支持消费 Debezium 数据即解决这个问题,为用户实时数据生态建设贡献绵薄之力。

支持 Debezium 的主流 CDC 技术比较

对于使用 Debezium 的用户来说,消费 Kafka 中的 Debezium 数据并将其写入其他数据源,有几种主流 CDC 技术可选,如下表。

Kafka-ConnectFlink-CDCCloudCanal
同步配置配置文件代码/配置(新版本)可视化
同步性能(延迟)优秀优秀优秀
社区支持一般积极积极
大规模部署使用一般优秀优秀
消息格式符合其标准的 JSON、Avro…Debezium JSON、Canal JSON、Maxwell JSONDebezium JSON、Canal JSON、CloudCanal JSON 等
插件支持Oracle、MySQL、SqlServer…Oracle、MySQL、SqlServer…StarRocks、Doris、Elasticsearch 等 12 种

CloudCanal 支持 Debezium 做了那些事

CloudCanal 之前即实现了将数据库数据以 Debezium 格式写入目标端 Kafka 的能力,并在兼容性方面做了大量优化。

此次版本更新则支持从 Kafka 消费 Debezium 格式数据,并同步到对端数据库或数仓, 形成基于 Kafka 中转的端到端数据迁移同步能力,同时可平滑对接上/下游已使用其他工具且以 Debezium 数据格式载体的需求。

操作示例

Debezium 环境准备

  • 相关资源一键部署 (Docker): debezium-test.tar.gz
    • Kafka 集群 + Kafka UI
    • Debezium
    • MySQL (源端)
    • Starrocks (目标端)
    tar -xzvf debezium-test.tar.gz
    sh install.sh
    

创建 MySQL Source Connector

  • 源端是 MySQL,通过下面的表进行创建。

    CREATE DATABASE `inventory`;CREATE TABLE `inventory`.`customer` (`c_int` int NOT NULL,`c_bigint` bigint NOT NULL, `c_decimal` decimal(10,3) NOT NULL,`c_date` date NOT NULL,`c_datetime` datetime NOT NULL,`c_timestamp` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,`c_year` int NOT NULL,`c_varchar` varchar(10) NOT NULL,`c_text` text NOT NULL,PRIMARY KEY (`c_int`)
    );
    
  • 通过 Debezium 的 Api 接口创建 Connector 订阅 MySQL 的变更事件。

    curl -i -X POST http://127.0.0.1:7750/connectors \-H 'Content-Type: application/json' \-d '{"name": "connector-test-mx","config": {"connector.class": "io.debezium.connector.mysql.MySqlConnector","database.hostname": "112.124.38.87","database.port": "25000","database.user": "root","database.password": "123456","database.server.id": "1","database.server.name": "mx","database.include.list": "inventory","topic.prefix": "mx","table.include.list": "inventory.customer","snapshot.mode": "never","database.history.kafka.bootstrap.servers": "112.124.38.87:19092,112.124.38.87:29092,112.124.38.87:39092","schema.history.internal.kafka.bootstrap.servers": "112.124.38.87:19092,112.124.38.87:29092,112.124.38.87:39092","schema.history.internal.kafka.topic": "mx.schemahistory.customer","database.history.kafka.topic": "mx.mx_history_schema","include.schema.changes": "false"     }}'
    
  • 创建后,查看 Connetor 的状态。

    curl -s http://127.0.0.1:7750/connectors/connector-test-mx/status
    

CloudCanal 订阅 Kafka 的数据变更

准备 CloudCanal

  • 下载安装 CloudCanal 私有部署版本

添加数据源

  • 数据源管理 -> 添加数据源, 添加 Kafka、Starrocks、MySQL
    image.png
    image.png
    image.png

创建同步任务

  • 任务管理-> 新建任务

  • Kafka选择 Debezium Envelope Json Format格式

  • 该消息格式的说明,参见:源端 Kafka Debezium Json 使用说明
    image.png
    image.png

  • Kafka 消息中如果有 Schema,需要在 任务详细 -> 参数修改 -> 源数据源配置 中修改 envelopSchemaIncludetrue
    image.png

同步测试

  • 源端数据库做数据变更,Debezium 将数据写入 Kafka 后,CloudCanal 会写入到 Starrocks 中。
    image.png

  • 数据同步结束后校验 MySQL 和 Starrocks 的数据,40 万左右的数据是一致的。
    image.png

总结

本文介绍了 CloudCanal 支持消费 Debezium 格式数据的背景,以及通过 MySQL -> Kafka -> Starrocks 示例介绍其使用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/292243.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

通过fastlane打包上传到appstore

需要mac设备 一、Fastlane 安装 1、安装 Xcode command line tools: 命令行 xcode-select --install 2、安装Fastlane 命令行 brew install fastlane 3、初始化Fastlane cd到你的需要打包的项目目录,然后执行命令 fastlane init 解释&#xff1a…

华为云开发者日·2023年度创享峰会圆满落幕 | 开发者是底色

“ 共创科技未来,点亮智慧之光。” 文 | 云舒 编辑 | 小白 出品|极新 一场科技盛宴,汇聚了无数智慧与激情。2023年12月20日,华为云开发者日2023年度创享峰会在北京市中关村科学城东升科技园盛大举行。 华为云开发者日是面向全…

plc无线远程模块,实现PLC远程数采

在工业自动化领域,PLC(可编程逻辑控制器)的应用日益广泛。然而,传统的PLC数据采集方式通常受限于有线连接,限制了数据的灵活性和可访问性。在这个背景下,PLC无线远程模块成为了解决方案的热点。今天&#x…

飞天使-k8s知识点4-验证安装好后功能

文章目录 接k8s知识点2之验证集群功能创建dashboard验证安装nginx 并访问tomcat 访问 接k8s知识点2之验证集群功能 [rootkubeadm-master2 tmp]# kubectl run net-test1 --imagealpine sleep 36000 pod/net-test1 created [rootkubeadm-master2 tmp]# kubectl get pod NAME …

配置MUX VLAN示例(接入层设备)

一、组网需求 在企业网络中,企业所有员工都可以访问企业的服务器。但对于企业来说,希望企业内部部分员工之间可以互相交流,而部分员工之间是隔离的,不能够互相访问。为了解决上述问题,可在连接终端的交换机上部署MUX …

活动回顾 (上) | 2023 Meet TVM 系列活动完美收官

作者:xixi 编辑:三羊、李宝珠 2023 Meet TVM 年终聚会于 12 月 16 日在上海圆满落幕,本次 meetup 不仅邀请到了 4 位 AI 编译器专家为大家带来了精彩的分享,还新增了圆桌讨论环节,以更多元的视角和各位共同讨论大模型…

mysql统计函数round失效问题

mysql统计函数round失效问题 目录1、问题2、找到原因3、解决办法4、类似问题欢迎品论区补充~ 目录 1、问题 我的mysql版本为:8.1.0 我在mysql使用sum对数据统计后使用round函数进行四舍五入取整,发现像16.145这样的数字取小数后2位后是16.14而非16.15。…

模型评估方法

目录 数据集切分 交叉验证 交叉验证实例 混淆矩阵 实例 代码实现 阈值 全局阈值处理 自适应阈值处理 阈值对结果的影响 ROC曲线 数据集切分 数据集切分是指将一个数据集分割成训练集和测试集的过程。常用的方法是随机切分,即将数据集中的样本按照一定比…

小红书kos和kop有什么区别,营销玩法有哪些

相信熟悉媒介传播的朋友,对于kol和koc都不陌生。但随着平台的发展和市场的进步,又出现了kos和kop。那么小红书kos和kop有什么区别,营销玩法有哪些? 一、什么是kos和kop KOS,全称叫做Key Opinion Sales,意思…

智能变电站集中监控辅助决策系统解决方案

项目背景 智能变电站是坚强智能电网的重要基础和支撑。它不仅是电网运行数据的采集源头和命令执行单元,而且与其他环节的联系非常紧密,为统一坚强智能电网的安全和优质提供了关键支撑。为了更有效地管理和控制变电站的各种设备和子系统,如视…

FastAPI实现文件上传下载

FastAPI实现文件上传下载 1.后端FastAPI2.后端html3.效果 最近的项目需求,是前端vue,后端fastAPI,然后涉及到图像的消息发送,所以需要用fast写文件上传下载的接口,这里简单记录一下。 1.后端FastAPI import os.path i…

MySQL-2

复习 1. Data数据–>DB数据库–>DBMS数据库管理系统常见DBMS: MySQL oracle sql server db2 … redis Mongodb两大功能: 定义DDL 操纵DML 2. 表table创建表, 行和列 3. MySQL数据类型数据类型分成三大类:数值型、字符型、日期时间类4. 关于列属性…