【Flink-cdc-Mysql-To-Kafka】使用 Flinksql 利用集成的 connector 实现 Mysql 数据写入 Kafka

【Flink-cdc-Mysql-To-Kafka】使用 Flinksql 利用集成的 connector 实现 Mysql 数据写入 Kafka

  • 1)环境准备
  • 2)准备相关 jar 包
  • 3)实现场景
  • 4)准备工作
    • 4.1.Mysql
    • 4.2.Kafka
  • 5)Flink-Sql
  • 6)验证

1)环境准备

Linux 或者 Windows 端需要安装:Mysql,Kafka,Flink 等。(略)

2)准备相关 jar 包

  • flink-connector-jdbc_2.11-1.12.0.jar
  • mysql-connector-java-5.1.49.jar

下载地址:JDBC-Sql-Connector

在这里插入图片描述

在这里插入图片描述

  • flink-format-changelog-json-1.2.0.jar
  • flink-sql-connector-mysql-cdc-1.2.0.jar
  • flink-sql-connector-postgres-cdc-1.2.0.jar

下载地址:ververica/flink-cdc-connectors

在这里插入图片描述

备用下载地址:gitee地址(github上不去就下载源码,改好version自己打包)

  • flink-sql-connector-kafka_2.11-1.12.0.jar

下载地址:flink-sql-connector-kafka

  • 将下载好的包放在 Flink 的 lib 目录下

3)实现场景

1、首先确认MySQL是否开启binlog机制,log_bin = ON 为开启 (如下图)

在这里插入图片描述

2、如果是本地环境的 Mysql 按照下面方式开启 binlog

在 C:\ProgramData\MySQL\MySQL Server 5.7\my.ini 下添加

log_bin = mysql-bin
binlog_format = ROW
expire_logs_days = 30

3、重启 Mysql 服务

4)准备工作

4.1.Mysql

1、在 Mysql 中创建 source 表:

CREATE TABLE `mysql2kafka_cdc_test` (`id` int(11) NOT NULL AUTO_INCREMENT,`eventId` varchar(255) DEFAULT NULL,`eventStDt` varchar(255) DEFAULT NULL,`bak6` varchar(255) DEFAULT NULL,`bak7` varchar(255) DEFAULT NULL,`businessId` varchar(255) DEFAULT NULL,`phone` varchar(255) DEFAULT NULL,`bak1` varchar(255) DEFAULT NULL,`bak2` varchar(255) DEFAULT NULL,`bak13` varchar(255) DEFAULT NULL,`bak14` varchar(255) DEFAULT NULL,`bak11` varchar(255) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8

2、写入数据的语句准备就绪

INSERT INTO mysql2kafka_cdc_test(
eventId,
eventStDt,
bak6,
bak7,
businessId,
phone,
bak1,
bak2,
bak13,
bak14,
bak11
) VALUES(
'111',
'2022-11-3023:37:49',
'测试',
'https://test?user',
'1727980911111111111111111111',
'12345678910',
'1234',
'2021-12-0100:00:00',
'1727980911111111111111111111',
'APP',
'TEST1'
);

4.2.Kafka

创建 Topic

5)Flink-Sql

  • source
set table.dynamic-table-options.enabled=true;
set table.exec.source.cdc-events-duplicate=true;CREATE TABLE source_mysql_test(id INT,eventId STRING,eventStDt STRING,bak6 STRING,bak7 STRING,businessId STRING,phone STRING,bak1 STRING,bak2 STRING,bak13 STRING,bak14 STRING,bak11 STRING,PRIMARY KEY (id) NOT ENFORCED
) WITH('connector' = 'mysql-cdc','hostname' = '${ip}','port' = '${port}','database-name' = 'test','table-name' = 'mysql2kafka_cdc_test','username' = '${username}','password' = '${password}','scan.startup.mode'='timestamp','scan.startup.timestamp-millis' = '1692115200000'
);
  • sink
CREATE TABLE sink_kafka_test (id INT,eventId STRING,eventStDt STRING,bak6 STRING,bak7 STRING,businessId STRING,phone STRING,bak1 STRING,bak2 STRING,bak13 STRING,bak14 STRING,bak11 STRING,PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'upsert-kafka','topic' = 'test','sink.parallelism' = '3','key.format' = 'json','value.format' = 'json','properties.bootstrap.servers' = '${kafka-bootstrap-server}','properties.security.protocol' = 'SASL_PLAINTEXT','properties.sasl.kerberos.service.name' = 'kafka','metadata.max.age.ms' = '300000'
);
  • insert
insert into sink_kafka_test select * from source_mysql_test;

6)验证

Mysql 中写入测试数据,Kafka-Topic 中观察是否有数据生成。

INSERT INTO mysql2kafka_cdc_test(
eventId,
eventStDt,
bak6,
bak7,
businessId,
phone,
bak1,
bak2,
bak13,
bak14,
bak11
) VALUES(
'111',
'2022-11-3023:37:49',
'测试',
'https://test?user',
'1727980911111111111111111111',
'12345678910',
'1234',
'2021-12-0100:00:00',
'1727980911111111111111111111',
'APP',
'TEST1'
);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/285829.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

前端视角看 Docker :在国内的基础配置教程(含国内镜像源)

引言 在国内使用Docker时,直接从Docker Hub拉取镜像可能会遇到网络速度慢的问题。配置国内的镜像加速器可以显著提升拉取速度。本教程将指导您完成安装Docker后的基础配置,特别是设置国内镜像加速器。 1. 安装Docker 确保您已在系统上安装Docker。根…

不做数据采集,不碰行业应用,专注数字孪生PaaS平台,飞渡科技三轮融资成功秘诀

12月15日,飞渡科技在北京举行2023年度投资人媒体见面会,全面分享其产品技术理念与融资之路。北京大兴经开区党委书记、管委会主任常学智、大兴经开区副总经理梁萌、北京和聚百川投资管理有限公司(以下简称“和聚百川”)投资总监严…

UE5 C++(七)— UObject、UGameInstance实例化

文章目录 UObject实例化创建一个MyObject类在Default Pawn Class 中实现MyObject中参数调用 UGameInstance实例化创建GameInstance UObject实例化 创建一个MyObject类 在Default Pawn Class 中实现 注意:要实现运行时调用可在这里提前配置,具体参考之前…

力扣14题-最长公共前缀[简单]

题目描述 编写一个函数来查找字符串数组中的最长公共前缀。 如果不存在公共前缀,返回空字符串 ""。 示例 1: 输入:strs ["flower","flow","flight"] 输出:"fl" 示例 2&am…

spring之面向切面:AOP(2)

学习的最大理由是想摆脱平庸,早一天就多一份人生的精彩;迟一天就多一天平庸的困扰。各位小伙伴,如果您: 想系统/深入学习某技术知识点… 一个人摸索学习很难坚持,想组团高效学习… 想写博客但无从下手,急需…

一文详细介绍Ehcache

title: Ehcache 快速入门 categories: 编程 Java 中间件 缓存 tags: Java 中间件 缓存 Ehcache abbrlink: 2720adf1 date: 2022-02-17 22:34:30 permalink: /pages/c4647d/ Ehcache 快速入门 EhCache 是一个纯 Java 的进程内缓存框架,具有快速、精干等特点&#xff…

【mask转json】文件互转

mask图像转json文件 当只有mask图像时,可使用下面代码得到json文件 import cv2 import os import json import sysdef func(file:str) -> dict:png cv2.imread(file)gray cv2.cvtColor(png, cv2.COLOR_BGR2GRAY)_, binary cv2.threshold(gray,10,255,cv2.TH…

metabase filter

What’s this for? Variables in native queries let you dynamically replace values in your queries using filter widgets or through the URL. 本机查询中的变量允许您使用过滤器小部件或通过 URL 动态替换查询中的值。 Variables {{variable_name}} creates a variable…

后端低代码平台探索总结

业务需求快速变化的背景 我们在对业务需求进行梳理后,在进行程序设计时,对于将来可能发生变化的常量、变量、阀值、开关、条件、公式等等,可能会配置在环境变量或数字字典来支持可配置。但是需求变化往往会更加复杂、更加不可预测&#xff0…

Qt实现动画的2种方式

由于我之前是写java的所以在学习Qt的时候感觉会有点熟悉,因为Qt就是 用c写,而java底层也是c实现的 先看效果: 一、使用QMovie 这种方式我目前是用来加载gif图的,很简单噢,只不过我是加载的本地的路径,如…

减速机振动相关标准 - 笔记

参考标准:国家标准|GB/T 39523-2020 减速机的振动标准与发动机不同,摘引: 原始加速度传感器波形 可以明显看到调幅波 它的驱动电机是300Hz~2000Hz范围的。这个采样时间是5秒,看分辨率至少1024线。可分出500条谱线。 频谱部分 …

持续集成交付CICD:基于 GitLabCI 与 JenkinsCD 实现后端项目发布

目录 一、实验 1. GitLabCI环境设置 2.优化GitLabCI共享库代码 3.JenkinsCD 发布后端项目 4.再次优化GitLabCI共享库代码 5.JenkinsCD 再次发布后端项目 一、实验 1. GitLabCI环境设置 (1)GitLab给后端项目添加CI配置路径 (2&#xf…