Debezium-Embedded 实时监控MySQL数据变更

1.Debezium-Embedded 简介

        Debezium连接器的操作通常是将它们部署到Kafka Connect服务,并配置一个或多个连接器来监控上游数据库,并为它们在上游数据库中看到的所有更改生成数据更改事件。这些数据更改事件被写入Kafka,在那里它们可以被许多不同的应用程序独立使用。Kafka Connect提供了出色的容错性和可扩展性,因为它作为分布式服务运行,并确保所有注册和配置的连接器始终在运行。例如,即使集群中的一个Kafka Connect端点出现故障,其余的Kafka连接端点也会重新启动以前在现已终止的端点上运行的任何连接器,从而最大限度地减少停机时间并消除管理活动。

        并不是每个应用程序都需要这种级别的容错和可靠性,他们可能不想依赖外部的Kafka代理和Kafka Connect服务集群。相反,一些应用程序更喜欢将Debezium连接器直接嵌入到应用程序空间中。他们仍然想要相同的数据更改事件,但更喜欢让连接器将它们直接发送到应用程序,而不是将它们保存在Kafka中。

        这个Debezium-Embedded模块定义了一个小型库,允许应用程序轻松配置和运行debezium连接器。

2.MySQL端配置

2.1 开启日志

        MySQL开启日志配置可参考MySQL 主从配置-CSDN博客实现。

show variables like 'log_%';

2.2 创建监控账号并授权

#创建账号
create user debezium@'%' identified with mysql_native_password by 'wsx-123';
#给账号授权
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'@'%';
#刷新权限
FLUSH PRIVILEGES;

3.应用端开发

3.1 maven 引用debezium-embedded

<dependency><groupId>io.debezium</groupId><artifactId>debezium-embedded</artifactId><version>${debezium-embedded.version}</version>
</dependency>
<dependency><groupId>io.debezium</groupId><artifactId>debezium-connector-mysql</artifactId><version>${debezium-embedded.version}</version>
</dependency>

3.2 代码开发

package com.dayesmart.dataplusjava.util;import io.debezium.config.Configuration;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.engine.DebeziumEngine;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.source.SourceRecord;import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;@Slf4j
public class DebeziumTest {public static void main(String[] args) {Executor executor = Executors.newSingleThreadExecutor();Configuration config = Configuration.create()/* begin engine properties */.with("connector.class","io.debezium.connector.mysql.MySqlConnector").with("offset.storage","org.apache.kafka.connect.storage.FileOffsetBackingStore").with("offset.storage.file.filename","E:/tmp/debezium/offset.dat").with("offset.flush.interval.ms", 60000)/* begin connector properties */.with("name", "my-sql-connector").with("database.hostname", "127.0.0.1").with("database.port", 3307).with("database.user", "debezium").with("database.password", "wsx-123").with("database.connectionTimeZone", "Asia/Shanghai").with("database.server.id", 85744).with("database.include.list","test").with("snapshot.mode","initial").with("database.server.name","weisx").with("database.history","io.debezium.relational.history.FileDatabaseHistory").with("database.history.file.filename","E:/tmp/debezium/schemahistory.dat").build();// Create the engine with this configuration ...EmbeddedEngine engine = EmbeddedEngine.create().using(config).notifying(new EmbeddedEngine.ChangeConsumer(){@Overridepublic void handleBatch(List<SourceRecord> list, DebeziumEngine.RecordCommitter<SourceRecord> recordCommitter) throws InterruptedException {log.info("{}",list);}}).using((success,message,error) ->{log.info("success:{},message:{},error:{}",success,message,error);}).build();// Run the engine asynchronously ...executor.execute(engine);}}


 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/178523.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

三大开源向量数据库大比拼

向量数据库具有一系列广泛的好处&#xff0c;特别是在生成式人工智能方面&#xff0c;更具体地说&#xff0c;是在大语言模型&#xff08;LLM&#xff09;方面。这些好处包括先进的索引和精确的相似度搜索&#xff0c;有助于交付强大的先进项目。 本文将对三种开源向量数据库&…

Thread的常用方法

Thread获取和设置线程名称 Thread类获得当前线程的对象 这个方法是在哪个线程执行中调用的&#xff0c;就会得到哪个线程对象。 Thread类的线程休眠方法 public class MyThread extends Thread{public MyThread() {}public MyThread(String name) {// 为当前线程对象设置名称…

第十八章 Swing程序设计

Swing用于开发桌面窗体程序&#xff0c;是JDK的第二代GUI框架&#xff0c;其功能比JDK第一代GUI框架AWT更为强大、性能更加优良。但因为Swing技术推出时间太早&#xff0c;其性能、开发效率等不及一些其他流行技术&#xff0c;所以目前市场上大多数桌面窗体程序都不是由Java开发…

搭建成功simulink-stm32硬件在环开发环境

本次实验所使用的软件版本和硬件平台参数如下&#xff1a; Matlab版本: 2021b STM32硬件平台&#xff1a;YF_STM32_Alpha 1R4(参考自STM32 Nucleo F103RB官方开发板) YF_STM32_Alpha开发板 STM32 Nucleo F103RB 开发板 2.1 STM32硬件支持包下载 读者朋友平时使用的是和谐版M…

RGB转Bayer,一个小数点引发的血案

前几天写了一个RGB数据转Bayer格式的函数&#xff0c;经过测试功能正常。后来把这个函数用到一个数据库构建中&#xff0c;结果数据库出来的结果一直是一张黑图&#xff0c;追查了好几个小时&#xff0c;总算把这只虫子找出来了&#xff0c;原来是一个整数后面的小数点作祟。 …

MySQL2——喵喵期末不挂科

希望你开心&#xff0c;希望你健康&#xff0c;希望你幸福&#xff0c;希望你点赞&#xff01; 最后的最后&#xff0c;关注喵&#xff0c;关注喵&#xff0c;关注喵&#xff0c;大大会看到更多有趣的博客哦&#xff01;&#xff01;&#xff01; 喵喵喵&#xff0c;你对我真的…

【数据结构】树与二叉树(十一):二叉树的层次遍历(算法LevelOrder)

文章目录 5.2.1 二叉树二叉树性质引理5.1&#xff1a;二叉树中层数为i的结点至多有 2 i 2^i 2i个&#xff0c;其中 i ≥ 0 i \geq 0 i≥0。引理5.2&#xff1a;高度为k的二叉树中至多有 2 k 1 − 1 2^{k1}-1 2k1−1个结点&#xff0c;其中 k ≥ 0 k \geq 0 k≥0。引理5.3&…

物联网AI MicroPython学习之语法 GPIO输入输出模块

学物联网&#xff0c;来万物简单IoT物联网&#xff01;&#xff01; GPIO 介绍 模块功能: GPIO通用输入输出。 接口说明 GPIO - 构建GPIO对象 函数原型&#xff1a;Pin(port, dir , pull)参数说明&#xff1a; 参数类型必选参数&#xff1f;说明portintY对应开发板的引脚号…

热烈庆祝瑞森半导体成立10周年

瑞森半导体10年芯路&#xff0c;衷心感谢全球合作伙伴、 客户、员工、朋友的帮助与支持。 弹指一挥间&#xff0c;瑞森半导体已在功率半导体行业奋勇前行了十年。3650个白天与黑夜&#xff0c;瑞森半导体在风雨兼程中砥砺前行&#xff0c;在倾情奉献中不负初心。十年里有太多的…

Spring Boot MyBatis Plus 配置数据源详解

文章目录 1. 引入 MyBatis Plus 依赖2. 数据源配置3. MyBatis Plus 配置4. 动态数据源配置&#xff08;多数据源&#xff09;5. 小结 &#x1f389;欢迎来到架构设计专栏~Spring Boot MyBatis Plus 配置数据源详解 ☆* o(≧▽≦)o *☆嗨~我是IT陈寒&#x1f379;✨博客主页&…

智慧工地综合管理平台-项目开发管理规范

目的 本规范制定旨在规范项目的开发流程,提高软件开发质量和效率,降低开发成本和风险。该规范包括但不限于以下几个方面: 项目管理 包括项目计划、需求分析、设计、开发、测试、发布等环节,以及项目进度、质量和风险管理等方面项目计划管理:制定项目计划,包括确定项目目…

Wireshark抓包工具配置以及MQTT抓包分析

1、Wireshark抓包工具使用 打开Wireshark选择&#xff0c;需要抓取的物理网卡&#xff0c;添加过滤设置。 单击“捕获”&#xff0c;选择选项&#xff0c;输入需要捕获的IP地址和端口号。 如&#xff1a; ip host 10.60.4.45 and tcp port 1883 ip host 10.60.4.45 and http p…