Apache SeaTunnel MongoDB CDC 使用指南

随着数据驱动决策的重要性日益凸显,实时数据处理成为企业竞争力的关键。SeaTunnel MongoDB CDC(Change Data Capture) 源连接器的推出,为开发者提供了一个高效、灵活的工具,以实现对 MongoDB 数据库变更的实时捕获和处理。

file

本文将深入探讨该连接器的主要特性、支持的数据源信息、配置选项以及如何创建数据同步作业,助力开发者更好地利用 SeaTunnel 进行数据集成和实时数据分析。这些更新旨在为开发者提供更为丰富的数据处理能力,帮助他们更有效地捕获和处理来自 MongoDB 的变更数据。

支持的引擎

SeaTunnel Zeta
Flink

主要特性

  • 批处理
  • 流处理
  • 精确一次
  • 列投影
  • 并行度
  • 支持用户定义分片

功能描述

MongoDB CDC 源连接器允许从 MongoDB 数据库读取快照数据和增量数据。

支持的数据源信息

要使用 MongoDB CDC 连接器,需要以下依赖。它们可以通过 install-plugin.sh 脚本或从 Maven 中央仓库下载。

数据源支持的版本依赖
MongoDB通用下载

可用性设置

  1. MongoDB版本:MongoDB 版本 >= 4.0。
  2. 集群部署:副本集或分片集群。
  3. 存储引擎:WiredTiger 存储引擎。
  4. 权限:changeStream 和 read
use admin;
db.createRole({role: "strole",privileges: [{resource: { db: "", collection: "" },actions: ["splitVector","listDatabases","listCollections","collStats","find","changeStream" ]}],roles: [{ role: 'read', db: 'config' }]}
);db.createUser({user: 'stuser',pwd: 'stpw',roles: [{ role: 'strole', db: 'admin' }]}
);

数据类型映射

以下表格列出了从 MongoDB BSON 类型到 SeaTunnel 数据类型的字段数据类型映射。

MongoDB BSON 类型SeaTunnel 数据类型
ObjectIdSTRING
StringSTRING
BooleanBOOLEAN
BinaryBINARY
Int32INTEGER
Int64BIGINT
DoubleDOUBLE
Decimal128DECIMAL
DateDATE
TimestampTIMESTAMP
ObjectROW
ArrayARRAY

对于 MongoDB 中的特定类型,我们使用扩展 JSON 格式将它们映射到 SeaTunnel STRING 类型。

MongoDB BSON 类型SeaTunnel STRING 表示
Symbol{"_value": {"$symbol": "12"}}
RegularExpression{"_value": {"$regularExpression": {"pattern": "^9$", "options": "i"}}}
JavaScript{"_value": {"$code": "function() { return 10; }"}}
DbPointer{"_value": {"$dbPointer": {"$ref": "db.coll", "$id": {"$oid": "63932a00da01604af329e33c"}}}}
提示

在 SeaTunnel 中使用 DECIMAL 类型时,请注意最大范围不能超过 34 位数字,这意味着你应该使用 decimal(34, 18)。

名称类型必须默认值描述
hostsString-MongoDB 服务器的主机名和端口对的逗号分隔列表。例如:localhost:27017,localhost:27018
usernameString-连接 MongoDB 时使用的数据库用户名。
passwordString-连接 MongoDB 时使用的密码。
databaseList-要监视更改的数据库名称。如果未设置,则会捕获所有数据库。数据库还支持正则表达式,以监视与正则表达式匹配的多个数据库。例如:db1,db2。
collectionList-数据库中要监视更改的集合名称。如果未设置,则会捕获所有集合。集合也支持正则表达式,以监视与完全限定的集合标识符匹配的多个集合。例如:db1.coll1,db2.coll2。
connection.optionsString-MongoDB 的连接选项的和号分隔列表。例如:replicaSet=test&connectTimeoutMS=300000。
batch.sizeLong1024游标批大小。
poll.max.batch.sizeEnum1024轮询新数据时包含在单个批次中的更改流文档的最大数量。
poll.await.time.msLong1000等待检查更改流上的新结果之前的时间量。
heartbeat.interval.msString0发送心跳消息之间的时间长度(以毫秒为单位)。使用 0 禁用。
incremental.snapshot.chunk.size.mbLong64增量快照的块大小(MB)。
common-options-源插件通用参数,请参考源通用选项获取详情。

提示:

  • 如果集合变更速度较慢,强烈建议为 heartbeat.interval.ms 参数设置大于 0 的适当值。当我们从检查点或保存点恢复 SeaTunnel 作业时,心跳事件可以将 resumeToken 推进以避免其过期。
  • MongoDB 对单个文档有 16MB 的限制。更改文档包括附加信息,因此即使原始文档不大于 15MB,更改文档也可能超过 16MB 限制,导致更改流操作终止。
  • 建议使用不可变的分片键。在 MongoDB 中,分片键在启用事务后允许修改,但更改分片键可能导致频繁的分片迁移,造成额外的性能开销。此外,修改分片键还可能导致更新查找功能变得无效,在 CDC(更改数据捕获)场景中导致不一致的结果。

如何创建 MongoDB CDC 数据同步作业

将 CDC 数据打印到客户端

以下示例演示如何创建一个从 MongoDB 读取 CDC 数据并在本地客户端打印的数据同步作业:

env {# 您可以在此处设置引擎配置parallelism = 1job.mode = "STREAMING"checkpoint.interval = 5000
}source {MongoDB-CDC {hosts = "mongo0:27017"database = ["inventory"]collection = ["inventory.products"]username = stuserpassword = stpwschema = {fields {"_id" : string,"name" : string,"description" : string,"weight" : string}}}
}# 在本地客户端打印读取的 MongoDB 数据
sink {Console {parallelism = 1}
}

将 CDC 数据写入 MysqlDB

以下示例演示如何创建一个从 MongoDB 读取 CDC 数据并写入 mysql 数据库的数据同步作业:

env {# You can set engine configuration hereparallelism = 1job.mode = "STREAMING"checkpoint.interval = 5000
}source {MongoDB-CDC {hosts = "mongo0:27017"database = ["inventory"]collection = ["inventory.products"]username = stuserpassword = stpw}
}sink {jdbc {url = "jdbc:mysql://mysql_cdc_e2e:3306"driver = "com.mysql.cj.jdbc.Driver"user = "st_user"password = "seatunnel"generate_sink_sql = true# You need to configure both database and tabledatabase = mongodb_cdctable = productsprimary_keys = ["_id"]}
}

多表同步

以下示例演示如何创建一个读取 mongodb 多库表 CDC 数据并在本地客户端打印的数据同步作业:

env {# You can set engine configuration hereparallelism = 1job.mode = "STREAMING"checkpoint.interval = 5000
}source {MongoDB-CDC {hosts = "mongo0:27017"database = ["inventory","crm"]collection = ["inventory.products","crm.test"]username = stuserpassword = stpw}
}# Console printing of the read Mongodb data
sink {Console {parallelism = 1}
}

提示: 多库表 CDC 同步不能指定 schema,只能下游输出 json 数据。这是因为 MongoDB 不提供查询元数据信息,所以如果想支持多表,所有表只能作为一个结构读取。

使用正则表达式匹配多表

以下示例演示如何创建一个通过正则表达式读取 mongodb 多库表数据并在本地客户端打印的数据同步作业:

匹配示例表达式描述
前缀匹配^(test).*匹配数据库名或表名以 test 为前缀的,如 test1, test2 等。
后缀匹配.*[p$]匹配数据库名或表名以 p 为后缀的,如 cdcp, edcp 等。
```
env {
# You can set engine configuration here
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}

source { MongoDB-CDC { hosts = "mongo0:27017" # So this example is used (^(test).|^(tpc).|txc|.[p$]|t{2}).(t[5-8]|tt),matching txc.tt、test2.test5. database = ["(^(test).|^(tpc).|txc|.[p$]|t{2})"] collection = ["(t[5-8]|tt)"] username = stuser password = stpw } }

Console printing of the read Mongodb data

sink { Console { parallelism = 1 } }


### 实时流数据格式

{ _id : { }, // Identifier of the open change stream, can be assigned to the 'resumeAfter' parameter for subsequent resumption of this change stream "operationType" : " ", // The type of change operation that occurred, such as: insert, delete, update, etc. "fullDocument" : { }, // The full document data involved in the change operation. This field does not exist in delete operations "ns" : {
"db" : " ", // The database where the change operation occurred "coll" : " " // The collection where the change operation occurred }, "to" : { // These fields are displayed only when the operation type is 'rename' "db" : " ", // The new database name after the change "coll" : " " // The new collection name after the change }, "source":{ "ts_ms":" ", // The timestamp when the change operation occurred "table":" " // The collection where the change operation occurred "db":" ", // The database where the change operation occurred "snapshot":"false" // Identify the current stage of data synchronization }, "documentKey" : { "_id" : }, // The _id field value of the document involved in the change operation "updateDescription" : { // Description of the update operation "updatedFields" : { }, // The fields and values that the update operation modified "removedFields" : [ " ", ... ] // The fields and values that the update operation removed } "clusterTime" : , // The timestamp of the Oplog log entry corresponding to the change operation "txnNumber" : , // If the change operation is executed in a multi-document transaction, this field and value are displayed, representing the transaction number "lsid" : { // Represents information related to the Session in which the transaction is located "id" : , "uid" : } }

```

到此本指南就结束了,MongoDB CDC Sink连接器的发布,不仅强化了 Apache SeaTunnel 在数据集成领域的地位,也为开发者提供了更多的可能性。

Apache SeaTunnel 社区也期待您的参与和贡献,共同迈向更广阔的数据处理未来,让我们携手共建一个更加强大、开放、互助的社区!

本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/536635.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2024 前端javaScript+ES6

JavaScript 基础 1、基本数据类型: 1.1 基本数据类型: Number(数值):表示数字,包括整数和浮点数。例如:5、3.14。 String(字符串):表示文本数据&#xff…

Apache zookeeper kafka 开启SASL安全认证

背景:我之前安装的kafka没有开启安全鉴权,在没有任何凭证的情况下都可以访问kafka。搜了一圈资料,发现有关于sasl、acl相关的,准备试试。 简介 Kafka是一个高吞吐量、分布式的发布-订阅消息系统。Kafka核心模块使用Scala语言开发…

数据治理——滴滴大数据成本治理实践

原文大佬的这篇大数据平台成本治理实践是有借鉴意义的,这些摘抄下来用作沉淀学习。如有侵权,请告知~ 一、滴滴大数据成本治理总体框架 1.1 数据体系 从上图所示:最底层是以数据引擎为基础的数据存储,分为离线计算、实时计算、OL…

欧科云链做客Google Cloud与WhalerDAO专题论坛,畅谈Web3数据机遇

3月10日,由Google Cloud、WhalerDAO和baidao data主办,以Web3AI 2024 DATA POWER为主题的分享会在北京中关村举行。欧科云链高级研究员Jason Jiang受邀参加活动,带来“从链上数据发掘Web3时代的无限机遇”的主题分享。 Web3.0核心要素始终是链…

数据结构:6、栈

一、栈的概念 栈:一种特殊的线性表,其只允许在固定的一端进行插入和删除元素操作。进行数据插入和删除操作的一端 称为栈顶,另一端称为栈底。栈中的数据元素遵守后进先出LIFO(Last In First Out)的原则。 压栈&#x…

分类预测 | Matlab实现GSWOA-KELM混合策略改进的鲸鱼优化算法优化核极限学习机的数据分类预测

分类预测 | Matlab实现GSWOA-KELM混合策略改进的鲸鱼优化算法优化核极限学习机的数据分类预测 目录 分类预测 | Matlab实现GSWOA-KELM混合策略改进的鲸鱼优化算法优化核极限学习机的数据分类预测效果一览基本介绍程序设计参考资料 效果一览 基本介绍 GSWOA-KELM分类&#xff0…

基于Java的天然气工程业务管理系统(Vue.js+SpringBoot)

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块三、使用角色3.1 施工人员3.2 管理员 四、数据库设计4.1 用户表4.2 分公司表4.3 角色表4.4 数据字典表4.5 工程项目表4.6 使用材料表4.7 使用材料领用表4.8 整体E-R图 五、系统展示六、核心代码6.1 查询工程项目6.2 工程物资…

【how2j练习题】css部分课后练习

第一题 <html> <head> <meta http-equiv"Content-Type" content"text/html; charsetutf-8"> </head><style> body{font-family:"宋体";font-size:13px;color:#666666;width:643px;}.bold{font-weight:bold;}div.t…

C# wpf 使用GDI实现截屏

wpf截屏系列 第一章 使用GDI实现截屏&#xff08;本章&#xff09; 第二章 使用GDI实现截屏 第三章 使用DockPanel制作截屏框 第四章 实现截屏框热键截屏 第五章 实现截屏框实时截屏 第六章 使用ffmpeg命令行实现录屏 文章目录 wpf截屏系列前言一、导入gdi32方法一、NuGet获取…

【框架学习 | 第六篇】SpringBoot基础篇(快速入门、自动配置原理分析、配置文件、整合第三方技术、拦截器、文件上传/下载、访问静态资源)

文章目录 1.SpringBoot简介1.1原有Spring优缺点分析1.1.1Spring优点1.1.2Spring缺点 1.2SpringBoot概述1.2.1SpringBoot解决上述Spring的缺点1.2.2SpringBoot特点1.2.3SpringBoot核心功能 2.SpringBoot快速入门2.1代码实现2.1.1创建Maven工程2.1.2添加SpringBoot的起步依赖2.1.…

走进网络世界 了解一些基础知识

走进网络 1.认识计算机 1.计算机网络是由计算机和通讯构成的&#xff0c;网络研究的是“通信”。 ------1946 世界上第一台计算机 2.终端&#xff1a;只有输入和输出功能&#xff0c;没有计算和处理功能。3.数据&#xff1a;一串数字&#xff08;二进制数&#xff09;&#x…

OpenFeign服务接口调用

OpenFeign服务接口调用 1、OpenFeign简介 ​ Feign是一个声明性web服务客户端。它使编写web服务客户端变得更容易。使用Feign创建一个接口并对其进行注释。它具有可插入的注释支持&#xff0c;包括Feign注释和JAX-RS注释。Feign还支持可插拔编码器和解码器。Spring Cloud添加…