Debezium日常分享系列之:向 Debezium 连接器发送信号

Debezium日常分享系列之:向 Debezium 连接器发送信号

  • 一、概述
  • 二、激活源信号通道
  • 三、信令数据集合的结构
  • 四、创建信令数据集合
  • 五、激活kafka信号通道
  • 六、数据格式
  • 七、激活JMX信号通道
  • 八、自定义信令通道
  • 九、Debezium 核心模块依赖项
  • 十、部署自定义信令通道
  • 十一、信号动作
  • 十二、记录信号
  • 十三、即席快照信号
  • 十四、特别快照停止信号
  • 十五、增量快照
  • 十六、增量快照暂停信号
  • 十七、增量快照恢复信号
  • 十八、阻止快照信号
  • 十九、应用案例

一、概述

Debezium 信号机制提供了一种修改连接器行为或触发一次性操作(例如启动表的临时快照)的方法。要使用信号触发连接器执行指定操作,可以将连接器配置为使用以下一个或多个通道:

  • 源信号通道:可以发出 SQL 命令将信号消息添加到专门的信令数据集合中。在源数据库上创建的信令数据集合专门用于与 Debezium 进行通信。
  • Kafka信号通道;将信号消息提交到可配置的 Kafka 主题。
  • Jmx信号通道:通过 JMX 信号操作提交信号。
  • 文件信号通道:可以使用文件来发送信号。
  • Custom:将信号提交到实施的自定义通道。当 Debezium 检测到新的日志记录或临时快照记录添加到通道时,它会读取信号并启动请求的操作。

信号传输可与以下 Debezium 连接器一起使用:

  • Db2
  • MongoDB
  • MySQL
  • Oracle
  • PostgreSQL
  • SQL Server

可以通过设置 signal.enabled.channels 配置属性来指定启用哪个通道。该属性列出了已启用的通道的名称。默认情况下,Debezium 提供以下渠道:source 和 kafka。源通道默认启用,因为增量快照信号需要它。

二、激活源信号通道

默认情况下,Debezium 源信令通道已启用。

必须为要使用它的每个连接器显式配置信令。

程序:

  • 在源数据库上,创建信令数据收集表,用于向连接器发送信号。
  • 对于实现本机变更数据捕获 (CDC) 机制的源数据库(例如 Db2 或 SQL Server),为信令表启用 CDC。
  • 将信令数据集合的名称添加到 Debezium 连接器配置中。在连接器配置中,添加属性 signal.data.collection,并将其值设置为您在步骤 1 中创建的信令数据集合的完全限定名称。

例如,signal.data.collection = inventory.debezium_signals。

信令集合的完全限定名称的格式取决于连接器。

以下示例显示了每个连接器使用的命名格式:

  • Db2:.
  • MongoDB:.
  • MySQL:.
  • Oracle:..
  • PostgreSQL:.
  • SQL Server:..

三、信令数据集合的结构

信令数据集合或信令表存储您发送到连接器以触发指定操作的信号。信令表的结构必须符合以下标准格式。

  • 包含三个字段(列)。
  • 字段按特定顺序排列,如表 1 所示。

表 1. 信令数据集合所需的结构

字段类型描述
id(required)string标识信号实例的任意唯一字符串。为提交到信令表的每个信号分配一个 ID。通常,ID 是 UUID 字符串。可以使用信号实例进行日志记录、调试或重复数据删除。当信号触发 Debezium 执行增量快照时,它会生成带有任意 id 字符串的信号消息。生成的消息包含的 id 字符串与提交信号中的 id 字符串无关。
type(required)string指定要发送的信号类型。可以将某些信号类型与任何可提供信号传输的连接器一起使用,而其他信号类型仅可用于特定的连接器。
data(optional)string指定要传递给信号操作的 JSON 格式的参数。每种信号类型都需要一组特定的数据。

数据集合中的字段名称是任意的。上表提供了建议的名称。如果使用不同的命名约定,请确保每个字段中的值与预期内容一致。

四、创建信令数据集合

可以通过向源数据库提交标准 SQL DDL 查询来创建信令表。

先决条件:

  • 有足够的访问权限在源数据库上创建表。

程序:

  • 向源数据库提交SQL查询,创建符合所需结构的表,如下例所示:
CREATE TABLE <tableName> (id VARCHAR(<varcharValue>) PRIMARY KEY, type VARCHAR(<varcharValue>) NOT NULL, data VARCHAR(<varcharValue>) NULL);

注意:

分配给 id 变量的 VARCHAR 参数的空间量必须足以容纳发送到信令表的信号 ID 字符串的大小。如果 ID 的大小超出可用空间,连接器将无法处理信号。

以下示例显示了创建三列 debezium_signal 表的 CREATE TABLE 命令:

CREATE TABLE debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL);

五、激活kafka信号通道

可以通过将 Kafka 信令通道添加到 signal.enabled.channels 配置属性,然后将接收信号的主题名称添加到 signal.kafka.topic 属性来启用 Kafka 信令通道。启用信令通道后,将创建 Kafka 消费者来消费发送到配置的信号主题的信号。

可供消费者使用的附加配置:

  • Db2 connector Kafka signal configuration properties
  • MongoDB connector Kafka signal configuration properties
  • MySQL connector Kafka signal configuration properties
  • Oracle connector Kafka signal configuration properties
  • PostgreSQL connector Kafka signal configuration properties
  • SQL Server connector Kafka signal configuration properties

注意:

  • 要使用 Kafka 信令触发大多数连接器的临时增量快照,必须首先在连接器配置中启用源信令通道。
  • 源通道实现了水印机制,以对可能由增量快照捕获并在流恢复后再次捕获的事件进行重复数据删除。
  • 使用信令通道触发启用GTID的只读MySQL数据库的增量快照时,不需要启用源通道。

六、数据格式

Kafka 消息的键必须与 topic.prefix 连接器配置选项的值匹配。

该值是具有类型和数据字段的 JSON 对象。

当信号类型设置为执行快照时,数据字段必须包括下表中列出的字段:

表 2. 执行快照数据字段

字段默认值
typeincremental要运行的快照的类型。目前 Debezium 支持增量和阻塞类型。
data-collectionsN/A一组以逗号分隔的正则表达式,与要包含在快照中的数据集合的完全限定名称相匹配。使用与 signal.data.collection 配置选项所需的格式相同的格式指定名称。
additional-conditionN/A一个可选字符串,指定连接器评估的条件,以指定要包含在快照中的记录子集。注意:此属性已弃用,应由附加条件属性替换。
additional-conditionsN/A一个可选数组,指定连接器评估的一组附加条件,以确定要包含在快照中的记录子集。每个附加条件都是一个对象,指定过滤临时快照捕获的数据的条件。您可以为每个附加条件设置以下属性:数据采集:过滤器应用到的 {data-collection} 的完全限定名称。您可以对每个{data-collection}应用不同的过滤器。过滤:指定数据库记录中必须存在的列值,快照才能包含该列值,例如“color=‘blue’”。快照进程根据过滤器值评估 {data-collection} 中的记录,并仅捕获包含匹配值的记录。分配给过滤器属性的具体值取决于临时快照的类型:对于增量快照,可以指定一个搜索条件片段,例如“color=‘blue’”,快照会将其附加到查询的条件子句中。对于阻塞快照,可以指定完整的 SELECT 语句,例如您可以在 snapshot.select.statement.overrides 属性中设置的语句。

以下示例显示了典型的执行快照 Kafka 消息:

Key = `test_connector`Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`

七、激活JMX信号通道

可以通过将 jmx 添加到连接器配置中的 signal.enabled.channels 属性来启用 JMX 信号,然后启用 JMX MBean 服务器来公开信号 Bean。

程序

  • 使用首选的 JMX 客户端(例如 JConsole 或 JDK Mission Control)连接到 MBean 服务器。

  • 搜索 Mbean debezium.<连接器类型>.management.signals.<服务器>。 Mbean 公开接受以下输入参数的信号操作:

    • p0:信号的 ID。
    • p1:信号的类型,例如执行快照。
    • p2:包含有关指定信号类型的附加信息的 JSON 数据字段。
  • 通过提供输入参数的值来发送执行快照信号。

在 JSON 数据字段中,包含下表中列出的信息:

表 2. 执行快照数据字段

字段默认值
typeincremental要运行的快照的类型。目前 Debezium 支持增量和阻塞类型。
data-collectionsN/A一组以逗号分隔的正则表达式,与要包含在快照中的数据集合的完全限定名称相匹配。使用与 signal.data.collection 配置选项所需的格式相同的格式指定名称。
additional-conditionN/A一个可选字符串,指定连接器评估的条件,以指定要包含在快照中的记录子集。注意:此属性已弃用,应由附加条件属性替换。
additional-conditionsN/A一个可选数组,指定连接器评估的一组附加条件,以确定要包含在快照中的记录子集。每个附加条件都是一个对象,指定过滤临时快照捕获的数据的条件。您可以为每个附加条件设置以下属性:数据采集:过滤器应用到的 {data-collection} 的完全限定名称。您可以对每个{data-collection}应用不同的过滤器。过滤:指定数据库记录中必须存在的列值,快照才能包含该列值,例如“color=‘blue’”。快照进程根据过滤器值评估 {data-collection} 中的记录,并仅捕获包含匹配值的记录。分配给过滤器属性的具体值取决于临时快照的类型:对于增量快照,可以指定一个搜索条件片段,例如“color=‘blue’”,快照会将其附加到查询的条件子句中。对于阻塞快照,可以指定完整的 SELECT 语句,例如可以在 snapshot.select.statement.overrides 属性中设置的语句。

下图显示了如何使用 JConsole 发送信号的示例:

在这里插入图片描述

八、自定义信令通道

信令机制被设计为可扩展的。可以根据需要实施通道,以最适合您环境的方式向 Debezium 发送信号。

添加信令通道涉及几个步骤:

  • 为通道创建一个Java项目来实现通道,并添加Debezium Core作为依赖项。
  • 部署自定义信令通道。
  • 通过修改连接器配置,使连接器能够使用自定义信令通道。

提供自定义信令通道

自定义信号通道是实现 io.debezium.pipeline.signal.channels.SignalChannelReader 服务提供者接口 (SPI) 的 Java 类。例如:

public interface SignalChannelReader {String name(); void init(CommonConnectorConfig connectorConfig); List<SignalRecord> read(); void close(); 
}
  • 读者姓名。要使 Debezium 能够使用通道,请在连接器的 signal.enabled.channels 属性中指定此名称。
  • 初始化通道所需的特定配置、变量或连接。
  • 从通道读取信号。 SignalProcessor 类调用此方法来检索要处理的信号。
  • 关闭所有分配的资源。 Debezium 在连接器停止时调用此方法。

九、Debezium 核心模块依赖项

自定义信令通道 Java 项目具有对 Debezium 核心模块的编译依赖项。必须将这些编译依赖项包含在项目的 pom.xml 文件中,如以下示例所示:

<dependency><groupId>io.debezium</groupId><artifactId>debezium-core</artifactId><version>${version.debezium}</version> 
</dependency>
  • ${version.debezium} 表示 Debezium 连接器的版本。
  • 在 META-INF/services/io.debezium.pipeline.signal.channels.SignalChannelReader 中声明实现

十、部署自定义信令通道

先决条件

  • 有一个自定义信令通道 Java 程序。

程序

  • 要将自定义信号通道与 Debezium 连接器结合使用,请将 Java 项目导出到 JAR 文件,然后将该文件复制到包含要与其一起使用的每个 Debezium 连接器的 JAR 文件的目录。
  • 例如,在典型部署中,Debezium 连接器文件存储在 Kafka Connect 目录 (/kafka/connect) 的子目录中,每个连接器 JAR 位于其自己的子目录中 (/kafka/connect/debezium-connector-db2、/kafka /connect/debezium-connector-mysql 等)。

注意:

  • 要将自定义信号通道与多个连接器一起使用,必须将自定义信号通道 JAR 文件的副本放置在每个连接器的子目录中。

配置连接器以使用自定义信号通道

  • 将自定义信令通道的名称添加到 signal.enabled.channels 配置属性中。

十一、信号动作

可以使用信令来发起以下操作:

  • 将消息添加到日志中。
  • 触发临时增量快照。
  • 停止执行临时快照。
  • 暂停增量快照。
  • 恢复增量快照。
  • 触发临时阻塞快照。
  • 自定义动作。

有些信号并不与所有连接器兼容。

十二、记录信号

可以通过创建具有日志信号类型的信令表条目来请求连接器将条目添加到日志中。处理信号后,连接器将指定的消息打印到日志中。或者,可以配置信号,以便生成的消息包含流坐标。

表 4. 用于添加日志消息的信令记录示例

字段描述
id924e3ff8-2245-43ca-ba77-2af9af02fa07
typelog信号的动作类型。
data{“message”: “Signal message at offset {}”}message 参数指定要打印到日志的字符串。
如果您向消息添加占位符 ({}),它将被替换为流坐标。

十三、即席快照信号

可以通过创建具有执行快照信号类型的信号来请求连接器启动临时快照。处理信号后,连接器运行请求的快照操作。

与连接器首次启动后运行的初始快照不同,临时快照是在连接器已经开始从数据库传输更改事件之后在运行时期间发生的。可以随时启动临时快照。

临时快照可用于以下 Debezium 连接器:

  • Db2
  • MongoDB
  • MySQL
  • Oracle
  • PostgreSQL
  • SQL Server

表 5. 临时快照信号记录示例

字段
idd139b9b7-7777-4547-917d-e1775ea61d41
typeexecute-snapshot
data{“data-collections”: [“public.MyFirstTable”, “public.MySecondTable”]}

表 6. 即席快照信号消息示例

test_connector{“type”:“execute-snapshot”,“data”: {“data-collections”: [“public.MyFirstTable”], “type”: “INCREMENTAL”, “additional-conditions”:[{“data-collection”: “public.MyFirstTable”, “filter”:“color=‘blue’ AND brand=‘MyBrand’”]}}

其他资源

  • Db2 连接器增量快照
  • MongoDB 连接器增量快照
  • MySQL 连接器增量快照
  • Oracle 连接器增量快照
  • PostgreSQL 连接器增量快照
  • SQL Server 连接器增量快照

十四、特别快照停止信号

可以通过创建具有停止快照信号类型的信号表条目来请求连接器停止正在进行的临时快照。处理完信号后,连接器将停止当前正在进行的快照操作。

表 7. 停止临时快照信号记录示例

字段
idd139b9b7-7777-4547-917d-e1775ea61d41
typestop-snapshot
data{“type”:“INCREMENTAL”, “data-collections”: [“public.MyFirstTable”]}

必须指定信号的类型。数据收集字段是可选的。将数据收集字段留空以请求连接器停止当前快照中的所有活动。如果希望继续执行增量快照,但希望从快照中排除特定集合,请提供要排除的集合或正则表达式的名称的逗号分隔列表。连接器处理信号后,增量快照将继续,但它会排除指定的集合中的数据。

十五、增量快照

增量快照是一种特定类型的临时快照。在增量快照中,连接器捕获您指定的表的基线状态,类似于初始快照。但是,与初始快照不同,增量快照以块的形式捕获表,而不是一次捕获所有表。连接器使用水印方法来跟踪快照的进度。

通过以块的形式而不是在单个整体操作中捕获指定表的初始状态,增量快照比初始快照过程具有以下优势:

  • 当连接器捕获指定表的基线状态时,来自事务日志的近实时事件流将继续不间断。
  • 如果增量快照过程中断,可以从停止点恢复。
  • 可以随时启动增量快照。

十六、增量快照暂停信号

可以通过创建具有暂停快照信号类型的信号表条目来请求连接器暂停正在进行的增量快照。处理完信号后,连接器将停止暂停当前正在进行的快照操作。因此,无法指定数据收集,因为快照处理将暂停在处理信号时的位置。

表 8. 暂停增量快照信号记录示例

字段
idd139b9b7-7777-4547-917d-e1775ea61d41
typepause-snapshot

必须指定信号的类型。数据字段被忽略。

十七、增量快照恢复信号

可以通过创建具有恢复快照信号类型的信号表条目来请求连接器恢复暂停的增量快照。处理信号后,连接器将恢复之前暂停的快照操作。

表 9. 恢复增量快照信号记录示例

字段
idd139b9b7-7777-4547-917d-e1775ea61d41
typeresume-snapshot

十八、阻止快照信号

可以通过创建具有执行快照信号类型和具有值阻塞的 data.type 的信号来请求连接器启动临时阻塞快照。处理信号后,连接器运行请求的快照操作。

与连接器首次启动后运行的初始快照不同,临时阻塞快照在连接器停止从数据库传输更改事件后在运行时发生。您可以随时启动临时阻止快照。

表 10. 阻塞快照信号记录示例

字段
idd139b9b7-7777-4547-917d-e1775ea61d41
typeexecute-snapshot
data{“type”: “blocking”, “data-collections”: [“schema1.table1”, “schema1.table2”], “additional-conditions”: [{“data-collection”: “schema1.table1”, “filter”: “SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC”}, {“data-collection”: “schema1.table2”, “filter”: “SELECT * FROM [schema1].[table2] WHERE column2 > 0”}]}

表 11. 阻塞快照信号消息示例

test_connector{“type”:“execute-snapshot”,“data”: {“type”: “blocking”}

十九、应用案例

  • Debezium系列之:实现增量快照incremental技术的详细步骤
  • Debezium系列之:基于数据库信号表和Kafka信号Topic两种技术方案实现增量快照incremental技术的详细步骤
  • Debezium系列之:深入理解临时阻塞快照

更多Debezium实战应用可以参考博主Debezium专栏:

  • Debezium专栏,Debezium实战应用详细总结

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/312981.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

FlyMcu串口下载使用

本实验采用的下载方式为串口下载&#xff0c;需要利用FlyMcu软件下载。 实际操作可以有以下两种方式硬件操作和软件操作&#xff0c;但是首先都要求生成对应的hex文件。 步骤一&#xff1a;将要下载的文件&#xff0c;编译成相应的hex文件。 当运行框跳出一行信息如下&#xf…

navicat premium历史版本下载及更新navicat premium15 永久(使用)有效期

1、navicat premium介绍 Navicat Premium 是一套可创建多个连接的数据库开发工具&#xff0c;让你从单一应用程序中同时连接 MySQL、Redis、MariaDB、MongoDB、SQL Server、Oracle、PostgreSQL 和 SQLite 。它与 GaussDB 、OceanBase 数据库及 Amazon RDS、Amazon Aurora、Amaz…

影视后期:Pr 调色处理之风格调色

写在前面 整理一些影视后期相关学习笔记博文为 Pr 调色处理中风格调色&#xff0c;涉及下面几个Demo 好莱坞电影电影感调色复古港风调色赛博朋克风格调色日系小清晰调色 理解不足小伙伴帮忙指正 简单地说就是害怕向前迈进或者是不想真正地努力。不愿意为了改变自我而牺牲目前所…

WPF+Halcon 培训项目实战(10):HS组件绘制图案

文章目录 前言相关链接项目专栏运行环境匹配图片模板匹配加载模板文件运行结果 绘制十字标 WPF HS组件绘制图像绘制和生成的区别 前言 为了更好地去学习WPFHalcon&#xff0c;我决定去报个班学一下。原因无非是想换个工作。相关的教学视频来源于下方的Up主的提供的教程。这里只…

将按键次数写入AT24C02,再读出并用1602LCD显示

#include<reg51.h> //包含单片机寄存器的头文件 #include<intrins.h> //包含_nop_()函数定义的头文件 sbit RSP2^0; //寄存器选择位&#xff0c;将RS位定义为P2.0引脚 sbit RWP2^1; //读写选择位&#xff0c;将RW位定义为P2.1引脚 s…

内存泄漏检测工具

1. vs/vc(windows下)自带的检测工具 将下面的语句加到需要调试的代码中 #define _CRTDBG_MAP_ALLOC // 像一个开关,去开启一些功能,这个必须放在最上面 #include <stdlib.h> #include <crtdbg.h>// 接管new操作符 原理: 就是使用新定义的DBG_NEW去替换代码中的n…

uniapp中的uview组件库丰富的Form 表单用法

目录 基本使用 #Form-item组件说明 #验证规则 #验证规则属性 #uView自带验证规则 #综合实战 #校验错误提示方式 #校验 基本使用 此组件一般是用于表单验证使用&#xff0c;每一个表单域由一个u-form-item组成&#xff0c;表单域中可以放置u-input、u-checkbox、u-radio…

单片机数据发送程序

#include<reg51.h> //包含单片机寄存器的头文件 /***************************************************** 函数功能&#xff1a;向PC发送一个字节数据 ***************************************************/ void Send(unsigned char dat) { SBUFdat; whil…

第6课 用window API捕获麦克风数据并加入队列备用

今天是2024年1月1日&#xff0c;新年的第一缕阳光已经普照大地&#xff0c;祝愿看到这篇文章的所有程序员或程序爱好者都能在新的一年里持之以恒&#xff0c;事业有成。 今天也是我加入CSDN的第4100天&#xff0c;但回过头看一看&#xff0c;这么长的时间也没有在CSDN写下几篇…

【HarmonyOs Arkts笔记】Arkts ForEach循环使用

说明 ForEach循环数组对象时 要指定对象的唯一标识 例如 id&#xff0c;否则只会显示第一个 State tabsList: object[] [{ name: 砍价活动, id: 1, icon: https://php-b2c.likeshop.cn/uploads/images/2022062414322367e6a5479.png },{ name: 拼团活动, id: 2, icon: https:…

KFold解释和代码实现

KFold解释和代码实现 文章目录 一、KFold是什么&#xff1f;二、 实验数据设置2.1 实验数据生成代码2.2 代码结果 三、实验代码3.1 实验代码3.2 实验结果3.3 结果解释 四、总结 一、KFold是什么&#xff1f; 0&#xff0c;1&#xff0c;2&#xff0c;3&#xff1a;每一行表示测…

最优化方法Python计算:无约束优化应用——神经网络回归模型

人类大脑有数百亿个相互连接的神经元&#xff08;如下图(a)所示&#xff09;&#xff0c;这些神经元通过树突从其他神经元接收信息&#xff0c;在细胞体内综合、并变换信息&#xff0c;通过轴突上的突触向其他神经元传递信息。我们在博文《最优化方法Python计算&#xff1a;无约…