FLink1.17-Kafka实时同步到MySQL实践

news/2024/9/21 3:29:48/文章来源:https://www.cnblogs.com/yeyuzhuanjia/p/18370009

1.组件版本

组件

版本

Kafka

3.7.0

Flink

1.17.0

MySQL

8.0.32

 

2.Kafka生产数据

./kafka-console-producer.sh --broker-list hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic   kafka_test_table2

>{"id":123,"test_age":33}

>{"id":125,"test_age":28}

>{"id":126,"test_age":18}

 

3.Kafka消费数据

./kafka-console-consumer.sh  --bootstrap-server  hadoop01:9092,hadoop02:9092:hadoop03:9092  --topic kafka_test_table2 --from-beginning

 

4.MySQL创建表

 

CREATE TABLE `test_table2` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`test_age` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=16 DEFAULT CHARSET=utf8;

 

 

5.Flink 启动任务

cd /data/flink-1.17.0/bin/

./start-cluster.sh

 

./sql-client.sh

 

set sql-client.execution.result-mode=tableau ;

create Table kafka_test_table2 (

id    int,

test_age   int

)

with (

'connector' = 'kafka',

'topic' = 'kafka_test_table2',

'scan.startup.mode' = 'earliest-offset',

'properties.bootstrap.servers' = 'hadoop01:9092,hadoop02:9092,hadoop03:9092',

'properties.group.id' = 'group01',

'format' = 'json'

);

 

Create Table rds_test_table2 (

id    int,

test_age   int,

PRIMARY KEY (id)  NOT ENFORCED

)

with (

'connector' = 'jdbc',

'url' = 'jdbc:mysql://192.168.132.22:3306/test?serverTimezone=Asia/Shanghai',

'driver' = 'com.mysql.cj.jdbc.Driver',

'username' = 'root',

'password' = 'Root@1234',

'table-name'= 'test_table2'

);

 

insert into  rds_test_table2  select * from  kafka_test_table2 ;

 

6.Flink WebUI

登录Flink WebUI页面,查看任务是否启动成功

 

点击查看任务是否报错

 

如果出现The server time zone value ‘�й���׼ʱ��’ is unrecognized or represents more than one time zoneMySQLurl连接字符串中添加?serverTimezone=Asia/Shanghai即可解决。

 

7.MySQL验证

登录MySQL数据库,查看数据是否同步成功。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/784427.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[Paper Reading] Reconstructing Hands in 3D with Transformers

名称 Reconstructing Hands in 3D with Transformers 时间:CVPR2024 机构:UC Berkeley, University of Michigan, New York University TL;DR 本文提出一种使用Transformer来做Hand Tracking的算法名为HaMeR(Hand Mesh Recorvery),优势是大数据规模(利用多数据集的2D/3D标签…

“就是打一打”清华程思元打出百度之星20年史上罕见两连冠

8月18日,被称为“技术界奥斯卡”的2024百度之星程序设计大赛(以下简称“百度之星” )落下帷幕。近千人同场敲代码、中学生几乎垄断奖项、背靠背冠军诞生……百度之星像中国编程次世代的技术嘉年华,越来越多人加入其中。近千学霸角逐“编程奥斯卡”,初小学生比例创新高 百度…

企业级Scrum敏捷赋能课程:为企业量身定制的敏捷开发培训,旨在提升团队协作与项目交付能力,实现敏捷管理的落地。

​ ​ 课程简介: Scrum 是一种广泛应用的敏捷开发方法,用于项目管理和产品研发。该课程为期两天,专为研发管理者、项目经理、产品经理和研发团队设计。通过案例讲解和沙盘演练,学员将深入理解Scrum的核心理念,如产品价值驱动、以用户为中心、快速迭代和增量交付,掌握自管…

使用对比!SLS 数据加工 SPL 与旧版 DSL 场景对照

这里我们继续讨论在不同的数据处理需求中,新版数据加工 SPL 与旧版数据加工 DSL 的使用对照。对于数据同步的场景,即不需要做任何数据处理,新版 SPL 与旧版 DSL 均传入空逻辑即可,以下不再赘述。作者:灵圣 概述 如前一篇《SLS 数据加工全面升级,集成 SPL 语法》所述,SLS…

Jenkins与gitlab持续集成配置webhook报500错误

Hook execution failed: Failed to open TCP connection to localhost:8585 (Connection refused - connect(2) for "localhost" port 8585) 在点击push events进行测试Jenkins时,出现上图,是因为gitlab没有打开一项权限,做如下设置即可

在孩子成长路上,爱与智慧的陪伴是送给孩子最好的礼物

如今智能设备的普及、社交媒体的兴起和娱乐方式的碎片化,使我们时刻被各种信息和刺激包围,这种环境对正处于成长期的儿童影响尤为显著,他们更容易被快节奏、高刺激的内容吸引,导致专注力下降,从而影响学习和认知能力的发展。针对这一现实挑战,数业智能心大陆推出了AI儿童…

TXT 记录解析怎么做?

在当今数字化的时代,网络技术的应用越来越广泛,而域名系统(DNS)则是网络通信中至关重要的一部分。TXT 记录作为 DNS 中的一种记录类型,有着特定的用途和解析方法。 那么,TXT 记录解析究竟该怎么做呢? 一、了解 TXT 记录的概念 TXT 记录,即文本记录,是一种可以存储任意…

Kubernetes基础命令

Kubernetes 提供 kubectl 是使用 Kubernetes API 与 Kubernetes 集群的控制面进行通信的命令行工具。 这个工具叫做 kubectl。 针对配置信息,kubectl 在 $HOME/.kube 目录中查找一个名为 config 的配置文件。 你可以通过设置 KUBECONFIG 环境变量或设置 --kubeconfig 参数来指…

CRMEB多商户版前端页面安装依赖要点

CRMEB多商户版CRMEB_Mer_v2.3.2(20240710源码包).zip安装环境如下(很重要)平台管理-admin端安装依赖yarn install平台商户-mer端安装依赖yarn install平台客服-service端安装依赖npm install

打砖块小游戏html小游戏

这里提供一个打砖块小游戏html代码,有需要的小伙伴可以自己试试。body内容 点击查看代码 <select id="difficulty"><option value="easy">简单</option><option value="medium">中等</option><option value=&…

LLM应用实战: 产业治理多标签分类

本期的干货就是分享关于如何基于LLM实现数量多、层级多的多标签分类的实战经验,各位读者可以参考借鉴。1. 背景 许久未见,甚是想念~ 近期本qiang~换了工作,处于新业务适应期,因此文章有一段时间未更新,理解万岁! 现在正在着手的工作是产业治理方面,主要负责其中一个功能…

使用cilium开发ebpf程序

使用go开发ebpf程序最常见的一个框架就是cilium。开发前需要了解ebpf,了解go语言的基础知识。 在本地安装go之后下载bpf2go go get github.com/cilium/ebpf/cmd/bpf2go 从最简单的开发框架开始 下载示例源码 git clone https://github.com/cilium/ebpf.git 在ebpf/examples下是…