FlinkCDC第四部分-同步mysql到mysql,ctrl就完事~(flink版本1.17.1)

 本文介绍了不同源单表-单表同步,不同源多表-单表同步。

注:此版本支持火焰图

Flink版本:1.17.1

环境:Linux CentOS 7.0、jdk1.8

基础文件:

flink-1.17.1-bin-scala_2.12.tgz、

flink-connector-jdbc-3.0.0-1.16.jar、(maven仓库目录:corg.apache.flink/flink-connector-jdbc/3.0.0-1.16)

flink-sql-connector-mysql-cdc-2.3.0.jar、(maven仓库目录:com.ververica/flink-sql-connector-mysql-cdc/2.3.0)
安装Flink步骤详见文章第二篇

支持的mysql版本: 

一、 数据源ip为***.51的源表,同步数据到数据源ip为***.50的目标表中,需要以下几个步骤:

1. 启动flink服务:

[root@localhost bin]#  ./start-cluster.sh

2. 停止flink服务:

[root@localhost bin]#  ./stop-cluster.sh

3. 启动FinkSQL:

[root@localhost bin]# ./sql-client.sh

4. 编写FlinkSql,创建临时表和job:

FlinkSql与mysql字段的类型映射

 把写好的Sql粘贴到FlinkSql客户端命令行中,分号'  ;  '是语句结束标识符,按回车创建:

 创建来源表结构:

来源表链接类型为'connector' = 'mysql-cdc'

Flink SQL> CREATE TABLE source_alarminfo51 (
>   id STRING NOT NULL,
>   AlarmTypeID STRING,
>   `Time` timestamp,
>   PRIMARY KEY (`id`) NOT ENFORCED
>  ) WITH (
>     'connector' = 'mysql-cdc',
>     'hostname' = '***',
>     'port' = '3306',
>     'username' = '***',
>     'password' = '***',
>     'database-name' = 'alarm',
>     'server-time-zone' = 'Asia/Shanghai',
>     'table-name' = 'alarminfo'
>  );

[INFO] Execute statement succeed.

 创建目标表结构(目标表结构可比来源表字段多,可使用视图指定字段默认值):

目标表链接类型为'connector' = 'jdbc',注意url需要跟后面以下属性值

Flink SQL> CREATE TABLE target_alarminfo50 (
>   id STRING NOT NULL,
>   AlarmTypeID STRING,
>   `Time` timestamp
>   PRIMARY KEY (`id`) NOT ENFORCED
>  ) WITH (
>     'connector' = 'jdbc',
>     'url' = 'jdbc:mysql://***:3306/alarm?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&serverTimezone=Asia/Shanghai&useSSL=true&dontTrackOpenResources=true&defaultFetchSize=10000&useCursorFetch=true',
>     'username' = '***',
>     'password' = '****',
>     'table-name' = 'alarminfo',
>     'driver' = 'com.mysql.cj.jdbc.Driver'
>  );

[INFO] Execute statement succeed.

 最后创建同步关系:

INSERT INTO target_alarminfo50 SELECT * FROM source_alarminfo51;

如下:

创建完表结构可使用下列语句查看和删除:

查看表:show tables;

删除表:drop table if exists  target_alarminfo; 

flink-UI页面效果:

打开火焰图:

编辑flink-conf.yaml:最后面添加 

rest.flamegraph.enabled: true

配置后重启flink服务,重新创建任务。

火焰图效果:

数据同步效果:

源表:

目标表数据:首次数据全量,后面数据变更增量 

 注:

在分析火焰图时,可以关注以下几点:
函数的执行时间:纵向的轴显示了函数的嵌套层级,越往下表示越深层的函数调用。横向轴表示时间,通过不同颜色的方块来表示函数的执行时间。
热点函数:寻找占据执行时间大部分的函数,这些函数可能是需要优化的关键点。
函数之间的关系:观察函数之间的调用关系,查看是否有不必要的函数调用或循环。
I/O 操作:关注是否有大量的数据读取、写入或网络通信,这可能是性能瓶颈的来源。
根据火焰图的分析结果,您可以进一步定位和排查潜在的性能问题,并在代码、配置或资源分配方面进行优化。
请注意,为了准确分析火焰图,建议在负载较高的情况下生成火焰图,并保持足够的监视时间。此外,Flink 的火焰图功能在生产环境中可能会造成一定的开销,因此建议在测试或开发环境中使用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/20357.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

centos7根分区、文件系统扩容

1、 输入lsblk,查看到新硬盘sde,根目录现71G. 2、 创建分区fidisk /dev/sde 3、 刷新分区 partprobe /dev/sde,并创建物理卷 pvcreate /dev/sde1 4、 查看卷组名 vgdisplay 5、 将物理卷扩展到卷组 vgextend centos /dev/sde1 6、 查看逻辑巻…

拉丁语翻译器有哪些?一分钟快速分享

拉丁语翻译器有哪些?拉丁语是一种古老的语言,现在已经不再作为主要的交流工具使用。然而,在某些学术领域和文化传承中,拉丁语仍然具有重要地位。因此,当我们需要翻译拉丁语时,使用翻译器可以提高效率和准确…

立式oled拼接屏有哪些产品优点?

葫芦岛oled拼接屏是一种高清晰度的显示屏,由多个oled屏幕拼接而成。它可以用于广告牌、展览、演示、会议等场合,具有高亮度、高对比度、高色彩饱和度、高刷新率等优点,能够吸引人们的眼球,提高信息传递效果。 葫芦岛oled拼接屏的优…

Nodejs快速搭建简单的HTTP服务器,并发布公网远程访问

文章目录 前言1.安装Node.js环境2.创建node.js服务3. 访问node.js 服务4.内网穿透4.1 安装配置cpolar内网穿透4.2 创建隧道映射本地端口 5.固定公网地址 前言 Node.js 是能够在服务器端运行 JavaScript 的开放源代码、跨平台运行环境。Node.js 由 OpenJS Foundation&#xff0…

ens33没有inet地址

1)切换到根用户 su - root 按提示输入密码(不切换到根用户没有权限修改文件) (2)输入cd /etc/sysconfig/network-scripts/ (3)输入vi ifcfg-ens33 ifcfg-ens33 (4)光标移…

SQL力扣练习(六)

目录 1. 部门工资前三高的所有员工(185) 题解一(dense_rank()窗口函数) 题解二(自定义函数) 2.删除重复的电子邮箱(196) 题解一 题解二(官方解析) 3.上升的温度(197) 解法一(DATEDIFF())…

阿里云AliYun物联网平台使用-设备添加以及模拟设备端上云

一、前言 上一篇文章提到,我们已经申请了免费的阿里云平台,下面需要将我们的设备在阿里云上进行注册和申请,以便于我们的数据上云。 二、步骤 注册产品(设备模型) 在产品页面,点击 "创建产品" 。…

windows下使用arp 协议

/ //自动扫描局域网存活主机 本程序是利用arp协议去获取局域网中的存活主机 arp协议概述 地址解析协议,即ARP(Address Resolution Protocol),是根据IP地址获取物理地址的一个TCP/IP协议。主机发送信息时将包含目标IP地址的ARP请…

python散记

"""字符串格式化的两种方法"""name"sans" age18 math_score90.56 english_score88.8print(f"这个学生的名字叫{name},年龄{age},数学分数是{math_score},总分是{math_scoreenglish_score}") print("这个学生的名字叫%s…

克服 ClickHouse 运维难题:ByteHouse 水平扩容功能上线

前言 对于分析型数据库产品,通过增加服务节点实现集群水平扩容,并提升集群性能和容量,是运维的必要手段。 但是对于熟悉 ClickHouse 的工程师而言,听到“扩容”二字一定会头疼不已。开源 ClickHouse 的 MPP 架构导致扩容成本高&…

python接口自动化(三十四)-封装与调用--函数和参数化(详解)

简介 前面虽然实现了参数的关联,但是那种只是记流水账的完成功能,不便于维护,也没什么可读性,随着水平和技能的提升,再返回头去看前边写的代码,简直是惨不忍睹那样的代码是初级入门的代码水平都达不到。接下…

杂记:逆向一块FPGA核心板

最近太热了,实在无心看书。阵列书丢一边看不进去,还买了几本统计信号的甚至都没开始看(笑),躺在床上玩玩手机摆烂,看到某黄色APP上有老板卖拆机的板子,价格美丽,美中不足的是没有资料…