1.组件版本
组件 |
版本 |
Kafka |
3.7.0 |
Flink |
1.17.0 |
MySQL |
8.0.32 |
2.Kafka生产数据
./kafka-console-producer.sh --broker-list hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic kafka_test_table2
>{"id":123,"test_age":33}
>{"id":125,"test_age":28}
>{"id":126,"test_age":18}
3.Kafka消费数据
./kafka-console-consumer.sh --bootstrap-server hadoop01:9092,hadoop02:9092:hadoop03:9092 --topic kafka_test_table2 --from-beginning
4.MySQL创建表
CREATE TABLE `test_table2` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`test_age` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=16 DEFAULT CHARSET=utf8;
5.Flink 启动任务
cd /data/flink-1.17.0/bin/
./start-cluster.sh
./sql-client.sh
set sql-client.execution.result-mode=tableau ;
create Table kafka_test_table2 (
id int,
test_age int
)
with (
'connector' = 'kafka',
'topic' = 'kafka_test_table2',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'hadoop01:9092,hadoop02:9092,hadoop03:9092',
'properties.group.id' = 'group01',
'format' = 'json'
);
Create Table rds_test_table2 (
id int,
test_age int,
PRIMARY KEY (id) NOT ENFORCED
)
with (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.132.22:3306/test?serverTimezone=Asia/Shanghai',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'root',
'password' = 'Root@1234',
'table-name'= 'test_table2'
);
insert into rds_test_table2 select * from kafka_test_table2 ;
6.Flink WebUI
登录Flink WebUI页面,查看任务是否启动成功
点击查看任务是否报错
如果出现The server time zone value ‘�й���ʱ��’ is unrecognized or represents more than one time zone。在MySQL的url连接字符串中添加?serverTimezone=Asia/Shanghai即可解决。
7.MySQL验证
登录MySQL数据库,查看数据是否同步成功。