Flink构造宽表实时入库案例介绍

1. 安装包准备

Flink 1.15.4 安装包

Flink cdc的mysql连接器

Flink sql的sdb连接器

MySQL驱动

SDB驱动

Flink jdbc的mysql连接器

 

2. 入库流程图

3. Flink安装部署

  1. 上传Flink压缩包到服务器,并解压

tar -zxvf  flink-1.14.5-bin-scala_2.11.tgz  -C /opt/

  1. 复制依赖至Flink中

cp sdb-flink-connector-3.4.8-jar-with-dependencies.jar /opt/flink-1.14.5/lib
cp sequoiadb-driver-3.4.8.jre8.jar /opt/flink-1.14.5/lib
cp flink-sql-connector-mysql-cdc-2.2.1.jar /opt/flink-1.14.5/lib
cp flink-connector-jdbc_2.11-1.14.6.jar /opt/flink-1.14.5/lib

  1. 修改flink-conf.yaml文件

vi conf/flink-conf.yaml

### 配置Master的机器名(IP地址)
jobmanager.rpc.address: sdb1
### 配置每个taskmanager 生成的临时文件夹
io.tmp.dirs: /opt/flink-1.14.5/tmp

  1. 修改master文件

vi conf/masters

#作为master的ip和端口号
upgrade1:8081

  1. 修改worker文件

vi conf/workers

#集群主机名
upgrade1
upgrade2
upgrade3

  1. 拷贝到集群其他机器

scp -r /opt/flink-1.14.5 sdbadmin@upgrade2:/opt/
scp -r /opt/flink-1.14.5 sdbadmin@upgrade3:/opt/

  1. 启动flink集群

[sdbadmin@upgrade1 flink-1.14.5]$ ./bin/start-cluster.sh

  1. 启动flink-SQL

[sdbadmin@upgrade1 flink-1.14.5]$ ./bin/sql-client.sh

4. 实时入库

编写造数程序进行造数

4.1 环境准备

4.1.1 开启mysql的binlog

  1. 创建binlog文件夹

[sdbadmin@upgrade1 mysql]$ mkdir /opt/sequoiasql/mysql/database/3306/binlog

  1. 开启binlog

vim /opt/sequoiasql/mysql/database/3306/auto.cnf

>>配置以下内容:
log_bin=/opt/sequoiasql/mysql/database/3306/binlog
binlog_format=ROW
expire_logs_days=1
server_id=1

配置完成之后,重启mysql

[sdbadmin@upgrade1 mysql]$ ./bin/sdb_mysql_ctl stop myinst
[sdbadmin@upgrade1 mysql]$ ./bin/sdb_mysql_ctl start myinst

4.1.2 创建mysql表

创建库

create database sbtest;
use sbtest;

创建表

CREATE TABLE sbtest1 (
    id INT UNSIGNED AUTO_INCREMENT,
    uuid INT(10),
    name1 CHAR(120),
    age INT(4),
    time1 DATETIME,
    PRIMARY KEY(id)
);

CREATE TABLE sbtest2 (
    id INT UNSIGNED AUTO_INCREMENT,
    uuid INT(10),
    name2 CHAR(120),
    age INT(4),
    time1 DATETIME,
    PRIMARY KEY(id)
);

CREATE TABLE sbtest3 (
    id INT UNSIGNED AUTO_INCREMENT,
    uuid INT(10),
    name3 CHAR(120),
    age INT(4),
    time1 DATETIME,
    PRIMARY KEY(id)
);

创建flink入库表

CREATE TABLE sbtest4 (
    id INT UNSIGNED AUTO_INCREMENT,
    uuid INT(10),
    name1 CHAR(120),
    name2 CHAR(120),
    name3 CHAR(120),
    age INT(4),
    time1 DATETIME,
    PRIMARY KEY(id)
);

4.1.3 创建flink映射表

需要用到flink-sql-connector-mysql-cdc-2.2.1.jar

CREATE TABLE sbtest1_mysql (
    id INT,
    uuid INT,
    name1 CHAR(120),
    age INT,
    time1 TIMESTAMP,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '192.168.223.135',
    'port' = '3306',
    'username' = 'root',
    'password' = 'root',
    'database-name' = 'sbtest',
    'table-name' = 'sbtest1'
);

CREATE TABLE sbtest2_mysql (
    id INT,
    uuid INT,
    name2 CHAR(120),
    age INT,
    time1 TIMESTAMP,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '192.168.223.135',
    'port' = '3306',
    'username' = 'root',
    'password' = 'root',
    'database-name' = 'sbtest',
    'table-name' = 'sbtest2'
);

CREATE TABLE sbtest3_mysql (
    id INT,
    uuid INT,
    name3 CHAR(120),
    age INT,
    time1 TIMESTAMP,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '192.168.223.135',
    'port' = '3306',
    'username' = 'root',
    'password' = 'root',
    'database-name' = 'sbtest',
    'table-name' = 'sbtest3'
);

创建flink -->  mysql入库映射表

需要用到flink-connector-jdbc_2.11-1.14.6.jar

CREATE TABLE sbtest4_mysql (
    id BIGINT,
    uuid INT,
    name1 CHAR(120),
    name2 CHAR(120),
    name3 CHAR(120),
    age INT,
    time1 TIMESTAMP,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://192.168.223.135:3306/sbtest',
    'username' = 'root',
    'password' = 'root',
    'table-name' = 'sbtest4'
);

创建flink -->  mysql入库映射表

需要用到sdb-flink-connector-3.4.8-jar-with-dependencies.jar

CREATE TABLE sbtest_sdb (
    id BIGINT,
    uuid INT,
    name1 CHAR(120),
    name2 CHAR(120),
    name3 CHAR(120),
    age INT,
    time1 TIMESTAMP,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'sequoiadb',
    'bulksize' = '1',
    'hosts' = '192.168.223.135:11810',
    'collectionspace' = 'sbtest',
    'collection' = 'sbtest4'
);

4.2 MySQL实时入库

4.2.1 Flink left join

select sdb1.id, sdb1.uuid, sdb1.name1, sdb2.name2, sdb3.name3, sdb1.age, sdb1.time1
from sbtest1_mysql sdb1
left join sbtest2_mysql sdb2
on sdb1.id = sdb2.id
left join sbtest3_mysql sdb3
on sdb1.id = sdb3.id;

4.2.2 mysql实时入库

insert into sbtest4_mysql select sdb1.id, sdb1.uuid, sdb1.name1, sdb2.name2, sdb3.name3, sdb1.age, sdb1.time1
from sbtest1_mysql sdb1
left join sbtest2_mysql sdb2
on sdb1.id = sdb2.id
left join sbtest3_mysql sdb3
on sdb1.id = sdb3.id;

查看Flink任务

查看可以成功入库

4.3 SDB实时入库

4.3.1 Flink left join

select sdb1.id, sdb1.uuid, sdb1.name1, sdb2.name2, sdb3.name3, sdb1.age, sdb1.time1
from sbtest1_mysql sdb1
left join sbtest2_mysql sdb2
on sdb1.id = sdb2.id
left join sbtest3_mysql sdb3
on sdb1.id = sdb3.id;

4.3.2 sdb实时入库

insert into sbtest_sdb select sdb1.id, sdb1.uuid, sdb1.name1, sdb2.name2, sdb3.name3, sdb1.age, sdb1.time1
from sbtest1_mysql sdb1
left join sbtest2_mysql sdb2
on sdb1.id = sdb2.id
left join sbtest3_mysql sdb3
on sdb1.id = sdb3.id;

查看Flink任务

显示已经成功入库

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/336166.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于python热门旅游景点推荐系统+爬虫技术

大数据分析,数据可视化等皆可用。 源码分享。

esp32UART串口外设(Arduino)

通用异步接收器/发送器 (UART) 介绍 通用异步接收器/发送器 (UART) 是一种硬件功能,它使用广泛采用的异步串行通信接口(如 RS232、RS422 和 RS485)处理通信(即时序要求和数据成帧&…

Underactuated Robotics - 欠驱动机器人学(二)- 简单摆杆

系列文章目录 前言 一、导言 本章的目标并不高,我们只想了解钟摆的动力学原理。 为什么是摆呢?部分原因是,我们大多数多连杆机器人操纵器的动力学都是大量耦合摆的动力学。此外,单摆的动力学原理非常丰富,足以引入我…

JVM主要的几种垃圾回收算法

1、Java 为什么要实现自动内存管理 ? 简化开发过程:通过内存自动管理可以避免手动分配和释放内存的麻烦,减少了内存泄漏和内存错误的风险,让研发能更专注于业务逻辑,不必纠结于内存管理的细节。 提高开发效率&#xff…

【Python程序开发系列】一文总结API的基本概念、功能分类、认证方式、使用方法和开发流程

这是Python程序开发系列原创文章,我的第195篇原创文章。 一、什么是API? API是软件开发中非常重要的概念,它简化了不同组件之间的交互和集成,提供了对其他软件或服务功能的访问和调用方式。 API是应用程序编程接口(Ap…

“To-Do Master“ GPTs:重塑任务管理的趣味与效率

有 GPTs 访问权限的可以点击链接进行体验:https://chat.openai.com/g/g-IhGsoyIkP-to-do-master 部署私人的 To-Do Master 教程:https://github.com/Reborn14/To-Do-Master/tree/main 引言 在忙碌的日常生活中,有效地管理日常任务对于提高生…

考虑柔性负荷的综合能源系统低碳经济优化调度【复现】

随着低碳发展进程的不断推进,综合能源系统(IES)逐渐成为实现减排目标的重要支撑技术。 基于能 源集线器概念,结合需求侧柔性负荷的可平移、可转移、可削减特性,构建了含风光储、燃气轮机、柔性负荷等 在内的 IES 模型。…

IDEA中在Java项目中添加Web模块 与配置tomcat服务器

现有项目添加直接走第二步 生成普通新项目 给项目添加框架支持 勾选 Web Application 选项, 点击OK 得到项目目录结构 , 出现web目录结构, 且web目录文件夹出现小蓝点 web或webapp 没有出现小蓝点 说明web配置没有出现或是手动构建的目录结构 , 在IDE关闭或者迁移项目时会出…

自承载 Self-Host ASP.NET Web API 1 (C#)

本教程介绍如何在控制台应用程序中托管 Web API。 ASP.NET Web API不需要 IIS。 可以在自己的主机进程中自托管 Web API。 创建控制台应用程序项目 启动 Visual Studio,然后从“开始”页中选择“新建项目”。 或者,从“ 文件 ”菜单中选择“ 新建 ”&a…

[VSCode] VSCode 常用快捷键

文章目录 VSCode 源代码编辑器VSCode 常用快捷键分类汇总01 编辑02 导航03 调试04 其他05 重构06 测试07 扩展08 选择09 搜索10 书签11 多光标12 代码片段13 其他 VSCode 源代码编辑器 官网:https://code.visualstudio.com/ 下载地址:https://code.visua…

【Android Studio】APP练手小项目——切换图片APP

本项目效果: 前言:本项目最终实现生成一个安卓APP软件,点击按钮可实现按钮切换图片。项目包含页面布局、功能实现的逻辑代码以及设置APP图标LOGO和自定义APP名称。 关于Android Studio的下载与安装见我的博文:Android Studio 最新…

Docker简介、基本概念和安装

Docker简介、基本概念和安装 1.docker简介 1.1 什么是docker Docker 最初是 dotCloud 公司创始人 Solomon Hykes (opens new window)在法国期间发起的一个公司内部项目,它是基于 dotCloud 公司多年云服务技术的一次革新,并于 2013 年 3 月以 Apache 2…