FlinkCDC数据实时同步Mysql到ES

考大家一个问题,如果想要把数据库的数据同步到别的地方,比如es,mongodb,大家会采用哪些方案呢? :::

  1. 定时扫描同步?

  2. 实时日志同步?

定时同步是一个很好的方案,比较简单,但是如果对实时要求比较高的话,定时同步就有点不合适了。今天给大家介绍一种实时同步方案,就是是使用flinkcdc 来读取数据库日志,并且写入到elasticsearch中。

1.什么是flinkcdc?

Flink CDC(Change Data Capture)是指通过 Apache Flink 实现的一种数据变化捕获技术。CDC 可以实时捕获数据库中的数据变化,如插入、更新、删除操作,并将这些变化数据流式地传输到其他系统或存储中。通过 Flink CDC,用户可以实时监控数据库中的数据变化,并将这些变化数据用于实时分析、ETL(Extract, Transform, Load)等应用场景。Flink CDC 通常用于构建实时数据管道,帮助用户实现实时数据同步和分析。

2.flinkcdc发展趋势?

目前在github 上大概有5k 的star,也有越来越多的人使用。

3.flinkcdc有什么优势?

说到实时同步,canal 是比较常用的方案

canal,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。 这句介绍有几个关键字:增量日志,增量数据订阅和消费。

canal的把自己伪装成MySQL slave,模拟MySQL slave的交互协议向MySQL Mater发送 dump协议,MySQL mater收到canal发送过来的dump请求,开始推送binary log给canal,然后canal解析binary log,再发送到存储目的地,比如MySQL,Kafka,Elastic Search等等。

那么 flinkcdc 和canal 对比,有什么不同呢?

这是网上的一个对比。可以看到 flinkcdc 和canal 一样,也是通过读取数据库日志的方式做到实时同步的,这个和很多实时同步的工具原理相同,比如 ogg debezium 都是这样做的,flinkcdc 的优势是基于flink 这个强大的实时计算引擎,可以做到集群部署,高可用等等,并且社区活跃,支持的平台多,像 mysql oracle mongodb 主流数据库都是支持的。而canal只支持mysql。

还有一个优势,flinkcdc 是基于java实现的,背靠大数据这个大平台,解决方案也是比较多的。源码阅读修改起来也是比较方便的。

4.一个例子

光说不练假把式,简单的写一个把mysql 数据实时同步到es的例子,使用flinksql的方式,只需要简单的几行sql

依赖
flink-1.15.0
flink-sql-connector-elasticsearch7-1.15.0.jar
flink-sql-connector-mysql-cdc-2.2.1.jar
mysql 5.7
es 7.9.3

安装好flink 之后,把 flink-sql-connector-elasticsearch7-1.15.0.jar flink-sql-connector-mysql-cdc-2.2.1.jar 上传到 flink lib 目录 启动flink

./start-cluster.sh

打开flink sql 窗口

./start-cluster.sh

创建和mysql 关联的表

CREATE TABLE products (id INT,name STRING,description STRING,PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'mysql-cdc','hostname' = 'localhost','port' = '3306','username' = 'root','password' = '123456','database-name' = 'mydb','table-name' = 'products');
CREATE TABLE orders (order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10, 5),product_id INT,order_status BOOLEAN,PRIMARY KEY (order_id) NOT ENFORCED) WITH ('connector' = 'mysql-cdc','hostname' = 'localhost','port' = '3306','username' = 'root','password' = '123456','database-name' = 'mydb','table-name' = 'orders');

创建和es 同步的表

CREATE TABLE enriched_orders (order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10, 5),product_id INT,order_status BOOLEAN,product_name STRING,product_description STRING,PRIMARY KEY (order_id) NOT ENFORCED) WITH ('connector' = 'elasticsearch-7','hosts' = 'http://192.168.91.134:9200','index' = 'enriched_orders');

创建读取mysql写入es任务

INSERT INTO enriched_ordersSELECT o.*, p.name, p.descriptionFROM orders AS oLEFT JOIN products AS p ON o.product_id = p.id;

执行这个任务后,mysql 的数据就能实时同步至es了

当然数据源也是支持很多种,比如 oracle mongodb sqlserver 写入端也支持 es kafka hive 等等,看大家需要。想我们的业务场景,是先将mysql 数据同步到kafka,然后再消费kafka 消息,把数据写入到es, hive,starrocks 等等。并且使用了checkpoint 做为断点恢复的保障。

5.最后

附上一些涉及的到网址,方便大家查阅

flinkcdc 官网 

flinkcdc github

flink 官网

flink 文档

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/193872.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

URAT串口通信协议

UART是异步串行全双工总线,面向设备和设备之间的连接 配置相关内容 1、串口为串行通讯方式,代表一个时钟周期,只可以收发一位数据 2、115200代表什么,以及115200单位 单位:bps(比特率、二进制/秒) 115200代表&#…

我的创作纪念日——365天

机缘 最开始我写博客没有什么特别的原因,主要是因为以下几点: 练习自己的语言组织能力 记录自己学习生活中学到的知识 主要还是想找一个好的保存 Markdown 笔记的平台。 最终我选择了 CSDN,一来是因为 CSDN 对 Markdown 语法的支持较为全面…

Rust开发——切片(slice)类型

1、什么是切片 在 Rust 中,切片(slice)是一种基本类型和序列类型。在 Rust 官方文档中,切片被定义为“对连续序列的动态大小视图”。 但在rust的Github 源码中切片被定义如下: 切片是对一块内存的视图,表…

【小沐学GIS】电子海图OpenCPN源代码编译和运行(VS2017 + Win10)

1、简介 免费的开源海图仪和船用GPS导航软件 https://opencpn.org/ 1.1 OpenCPN概述 OpenCPN是一款自由软件(GPLv2),用于创建简洁的海图绘图仪和导航软件,可以在航行过程中使用或者作为计划工具。OpenCPN提供大量免费海图下载&a…

Linux系统编程 day02 vim、gcc、库的制作与使用

Linux系统编程 day02 vim、gcc、库的制作与使用 01. vim0101. 命令模式下的操作0102. 切换到文本输入模式0103. 末行模式下的操作0104. vim的配置文件 02. gcc03. 库的制作与使用0301. 静态库的制作与使用0302. 动态库(共享库)的制作与使用 01. vim vim是一个编辑器&#xff0…

ESP32 Arduino实战协议篇-搭建独立的 Web 服务器

在此项目中,您将创建一个带有 ESP32 的独立 Web 服务器,该服务器使用 Arduino IDE 编程环境控制输出(两个 LED)。Web 服务器是移动响应的,可以使用本地网络上的任何浏览器设备进行访问。我们将向您展示如何创建 Web 服务器以及代码如何逐步工作。 项目概况 在直接进入项目…

Linux基础全整理 从入门到放弃,一些想说的话

阅读目录 断更后一些想说的话用户useraddpasswdpasswd文件详解 chageusermoduserdelshadow 文件格式切换用户 用户组groupaddgroup文件格式groupmodgroupdel登陆远程机器 磁盘RAIDraid0(安装系统)raid1(存放数据)raid 5&#xff0…

Linux - 进一步理解 文件系统 - inode - 机械硬盘

详谈机械磁盘 在上一篇博客当中,已经对 用户级缓冲区 和 系统缓冲区 的区别,和 初步认识 C 库函数 封装的 文件接口这些做了阐述。具体可以参考下述博客: Linux - 用户级缓冲区和系统缓冲区 - 初步理解Linux当中文件系统-CSDN博客 本博客将…

PWM实验

PWM相关概念 PWM:脉冲宽度调制定时器 脉冲:方波信号,高低电平变化产生方波 周期:高低电平变化所需要时间 频率:1s钟可以产生方波个数 占空比:在一个方波内,高电平占用的百分比 宽度调制:占…

java--拼图游戏

1、了解拼图游戏基本功能: 拼图游戏内容由若干小图像块组成的,通过鼠标点击图像块上下左右移动,完成图像的拼凑。 2、拼图游戏交互界面设计与开发: 通过创建窗体类、菜单、中间面板和左右面板完成设计拼图的交互界面 &#xff…

C++ Qt 学习(十):Qt 其他技巧

1. 带参数启动外部进程 QProcess 用于启动外部进程int QProcess::execute(const QString &program, const QStringList &arguments);QObject *parent; ... QString program "./path/to/Qt/examples/widgets/analogclock"; QStringList arguments; argument…

mysql 实现去重

个人网站 首发于公众号小肖学数据分析 1、试题描述 数据表user_test如下,请你查询所有投递用户user_id并且进行去重展示,查询结果和返回顺序如下 查询结果和返回顺序如下所示 解题思路: (1) 对user_id列直接去重: &#xff…