Fink CDC数据同步(二)MySQL数据同步

1 开启binlog日志

2 数据准备

use bigdata;
drop table if exists user;CREATE TABLE `user`(`id` INTEGER NOT NULL AUTO_INCREMENT,`name` VARCHAR(20) NOT NULL DEFAULT '',`birth` VARCHAR(20) NOT NULL DEFAULT '',`gender` VARCHAR(10) NOT NULL DEFAULT '',PRIMARY KEY(`id`)
);
ALTER TABLE user AUTO_INCREMENT = 1001;insert into user values(default , '东契奇' , '1995-01-01' , '男');
insert into user values(default , '斯蒂芬' , '1996-12-21' , '男');
insert into user values(default , '里奥梅西' , '1993-05-10' , '男');
insert into user values(default , '凯里欧文' , '1994-08-06' , '男');
insert into user values(default , '张淋艳' , '1997-12-01' , '女');
insert into user values(default , '王珊珊' , '1995-03-01' , '女');
insert into user values(default , '唐佳丽' , '1994-07-01' , '女');
insert into user values(default , '杨力维' , '1995-10-20' , '女');select * from user;

3 jar包依赖

在flink/lib目录下添加依赖:

flink-sql-connector-mysql-cdc-2.3.0.jar

下载地址:

Central Repository: com/ververica/flink-sql-connector-mysql-cdc

4 启动sql-client

# 启动服务
/opt/flink/flink-1.16.2/bin/start-cluster.sh 
# 启动sql-client
/opt/flink/flink-1.16.2/bin/sql-client.sh

设置模式

set sql-client.execution.result-mode = tableau;

设置checkpont

set execution.checkpointing.interval=30sec;

建mysql的映射表

CREATE TABLE if not exists mysql_user (id     STRING,name   STRING,birth  STRING,gender    STRING,PRIMARY KEY (`id`) NOT ENFORCED
) WITH ('connector'= 'mysql-cdc','hostname'= '192.168.0.1','port'= '3306','username'= 'user','password'='password','server-time-zone'= 'Asia/Shanghai','debezium.snapshot.mode'='initial','database-name'= 'bigdata1','table-name'= 'user'
); 

执行查询语句,会生成一个flink job任务

select * from mysql_user; 

5 常用参数表

参数名

必填

默认值

类型

参数描述

connector

String

指定connector,这里填 mysql-cdc

hostname

String

MySql server 的主机名或者 IP 地址

username

String

连接 MySQL 数据库的用户名

password

String

连接 MySQL 数据库的密码

database-name

String

需要监控的数据库名,支持正则表达式

table-name

String

需要监控的表名,支持正则表达式

port

3306

Integer

MySQL 服务的端口号

server-id

Integer

当开启scan.incremental.snapshot.enabled时,建议指定server-id;server-id 可以是单个值,如5400; 也可以提供数值范围,如5400-5408

scan.incremental.snapshot.enabled

TRUE

Boolean

增量快照是读取表快照的新机制;和旧的快照读相比有以下优点:1. 并行读取 2. 支持checkpoint 3. 不需要锁表;当需要并行读取时,server-id需要设置数值范围,如5400-5408

scan.incremental.snapshot.chunk.size

8096

Integer

表快照的块大小

scan.snapshot.fetch.size

1024

Integer

每次读表接受的最大值

scan.startup.mode

initial

String

MySQL CDC 启动模式,有效值:initial 和 latest-offset

connect.timeout

30s

Duration

connector 连接 MySQL 服务的最长等待超时时间

connect.max-retries

3

Integer

connector 创建 MySQL 连接的重试次数

connection.pool.size

20

Integer

连接池的大小


系列文章 

 Fink CDC数据同步(一)环境部署icon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136017355?spm=1001.2014.3001.5502
Fink CDC数据同步(二)MySQL数据同步icon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136017472?spm=1001.2014.3001.5501
Fink CDC数据同步(三)Flink集成Hiveicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136017571?spm=1001.2014.3001.5501
Fink CDC数据同步(四)Mysql数据同步到Kafkaicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136023747?spm=1001.2014.3001.5501
Fink CDC数据同步(五)Kafka数据同步Hiveicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136023837?spm=1001.2014.3001.5501

Fink CDC数据同步(六)数据入湖Hudiicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136023939?spm=1001.2014.3001.5502

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/460121.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

代码随想录算法训练营29期|day43 任务以及具体任务

第九章 动态规划 part05 1049. 最后一块石头的重量 II class Solution {public int lastStoneWeightII(int[] stones) {int sum 0;for (int i : stones) {sum i;}int target sum >> 1;//初始化dp数组int[] dp new int[target 1];for (int i 0; i < stones.lengt…

Java 将TXT文本文件转换为PDF文件

与TXT文本文件&#xff0c;PDF文件更加专业也更适合传输&#xff0c;常用于正式报告、简历、合同等场合。项目中如果有使用Java将TXT文本文件转为PDF文件的需求&#xff0c;可以查看本文中介绍的免费实现方法。 免费Java PDF库 本文介绍的方法需要用到Free Spire.PDF for Java…

【力扣】快乐数,哈希集合 + 快慢指针 + 数学

快乐数原题地址 方法一&#xff1a;哈希集合 定义函数 getNext(n) &#xff0c;返回 n 的所有位的平方和。一直执行 ngetNext(n) &#xff0c;最终只有 2 种可能&#xff1a; n 停留在 1 。无限循环且不为 1 。 证明&#xff1a;情况 1 是存在的&#xff0c;如力扣的示例一…

寒假作业-day5

1>现有无序序列数组为23,24,12,5,33,5347&#xff0c;请使用以下排序实现编程 函数1:请使用冒泡排序实现升序排序 函数2:请使用简单选择排序实现升序排序 函数3:请使用直接插入排序实现升序排序 函数4:请使用插入排序实现升序排序 代码&#xff1a; #include<stdio.h&g…

金融行业专题|证券超融合架构转型与场景探索合集(2023版)

更新内容 更新 SmartX 超融合在证券行业的覆盖范围、部署规模与应用场景。新增操作系统信创转型、Nutanix 国产化替代、网络与安全等场景实践。更多超融合金融核心生产业务场景实践&#xff0c;欢迎阅读文末电子书。 在金融行业如火如荼的数字化转型大潮中&#xff0c;传统架…

数据库表操作

建表删表 建表语句 --建表语句 create table Student (ID INTEGER constraint Student_KEY_1 primary key,--设定主键&#xff0c;同时会自动创建唯一索引NAME VARCHAR2(50) NOT NULL , --设定非空SEX CHAR(1) constraint SEX_CH…

ctfshow-命令执行(web118-web122)

web118 是一个窗口 查看源码 发现是system($code) 命令执行 经过测试禁用了很多东西 很多很多 $IFS可以 思路就是使用系统变量 构造我需要的poc 这些都是系统的环境变量 这是答案${PATH:~A}${PWD:~A}$IFS????.??? 解释一下 PATH变量输出结尾一般都是n 因为网站默认根目…

解决“使用Edge浏览器每次鼠标点击会出现一个黑色边框”的问题

目录 一 问题描述 二 解决方案 三 方案来源 四 参考资料 & AI工具 一 问题描述 为了方便进行收藏夹同步&#xff0c;开始从Chrome浏览器切换到Edge浏览器。在使用Edge浏览器过程中发现“每次鼠标点击会出现一个黑色边框”&#xff08;效果如下图所示&#xff09;&#…

使用异步命名管道通信的实例

记录一个使用异步命名管道通信的实例。代码参考了 MSDN 的文档&#xff1a;使用完成例程的命名管道服务器 - Win32 apps | Microsoft Learn。 服务端代码 #include <windows.h> #include <stdio.h> #include <tchar.h> #include <strsafe.h>#define…

containerd中文翻译系列(十五)转运服务

传输服务是一种简单灵活的服务&#xff0c;可用于在源和目的地之间传输人工制品对象。灵活的应用程序接口&#xff08;API&#xff09;允许传输接口的每个实施方案决定是否可以在源和目的地之间进行传输。这样&#xff0c;实现者就可以直接添加新功能&#xff0c;而无需对应用程…

高斯伪谱C++封装库开源!

Windows x64/86 C无依赖运行高斯伪谱法求解最优控制问题&#xff0c;你只需要ElegantGP! Author: Y. F. Zhang His Github: https://github.com/ZYunfeii 写在前面 这个库在你下载它的那一时刻起不再依赖任何其他代码&#xff0c;直接可用来构建C的最优控制问题并进行求解。…

机器学习--K-近邻算法常见的几种距离算法详解

文章目录 距离度量1 欧式距离(Euclidean Distance)2 曼哈顿距离(Manhattan Distance)3 切比雪夫距离 (Chebyshev Distance)4 闵可夫斯基距离(Minkowski Distance)5 标准化欧氏距离 (Standardized EuclideanDistance)6 余弦距离(Cosine Distance)7 汉明距离(Hamming Distance)【…