seatunnel数据集成(三)多表同步

seatunnel数据集成(一)简介与安装
seatunnel数据集成(二)数据同步
seatunnel数据集成(三)多表同步
seatunnel数据集成(四)连接器使用


seatunnel除了单表之间的数据同步之外,也支持单表同步到多表,多表同步到单表,以及多表同步到多表,下面简单举例。

1、单表 to 单表

一个source,一个sink

env {# You can set flink configuration hereexecution.parallelism = 2job.mode = "BATCH"
}
source{Jdbc {url = "jdbc:mysql://127.0.0.1:3306/test"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "user"password = "password"query = "select * from base_region"}
}transform {# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,# please go to https://seatunnel.apache.org/docs/transform/sql
}sink {jdbc {url = "jdbc:mysql://127.0.0.1:3306/dw"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "user"password = "password"query = "insert into base_region(id,region_name) values(?,?)"}
}
./bin/seatunnel.sh --config ./config/mysql2mysql_batch.conf

2、单表 to 多表

一个source,多个sink

env {execution.parallelism = 2job.mode = "BATCH"
}
source {Jdbc {url = "jdbc:mysql://127.0.0.1:3306/test"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "user"password = "password"result_table_name="t_user"query = "select * from t_user;"}
}transform {Sql {source_table_name = "t_user"result_table_name = "t_user_nan"query = "select id,name,birth,gender from t_user where gender ='男';"}Sql {source_table_name = "t_user"result_table_name = "t_user_nv"query = "select id,name,birth,gender from t_user where gender ='女';"}
}sink {jdbc {url = "jdbc:mysql://127.0.0.1:3306/dw"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "user"password = "password"source_table_name = "t_user_nan"query =  "insert into t_user_nan(id,name,birth,gender) values(?,?,?,?)"}jdbc {url = "jdbc:mysql://127.0.0.1:3306/dw"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "user"password = "password"source_table_name = "t_user_nv"query =  "insert into t_user_nv(id,name,birth,gender) values(?,?,?,?)"}
}
./bin/seatunnel.sh --config ./config/mysql2mysql_1n.conf

3、多表 to 单表

多个source,一个sink

表结构:

-- dw 源表1
CREATE TABLE IF NOT EXISTS ads_device_switch_performance (`event_time` timestamp COMMENT '业务时间',`device_id` VARCHAR(32) COMMENT '设备id',`device_type` VARCHAR(32) COMMENT '设备类型',`device_name` VARCHAR(128) COMMENT '设备名称',`cpu_usage` INT COMMENT 'CPU使用率百分比'
) ;INSERT INTO `ads_device_switch_performance` VALUES ('2024-01-15 14:25:11', '2001', '2', '交换器1', 49);
INSERT INTO `ads_device_switch_performance` VALUES ('2024-01-17 22:25:40', '2002', '1', '交换器2', 65);-- dw 源表2
CREATE TABLE IF NOT EXISTS ads_device_router_performance (`event_time` timestamp COMMENT '业务时间',`device_id` VARCHAR(32) COMMENT '设备id',`device_type` VARCHAR(32) COMMENT '设备类型',`device_name` VARCHAR(128) COMMENT '设备名称',`cpu_usage` INT COMMENT 'CPU使用率百分比'
);INSERT INTO `ads_device_router_performance` VALUES ('2024-01-17 21:23:22', '1001', '1', '路由器1', 35);
INSERT INTO `ads_device_router_performance` VALUES ('2024-01-16 17:23:53', '1002', '2', '路由器2', 46);-------------------------------------------------------------------------------
-- olap 目标表
CREATE TABLE `device_performance` (`id` INT NOT NULL AUTO_INCREMENT COMMENT '表主键',`event_time` VARCHAR(32) NOT NULL COMMENT '业务时间',`device_id` VARCHAR(32) COMMENT '设备id',`device_type` VARCHAR(32) COMMENT '设备类型',`device_name` VARCHAR(128) NOT NULL COMMENT '设备名称',`cpu_usage` FLOAT NOT NULL COMMENT 'CPU利用率单位是%',`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',PRIMARY KEY (`id`)
) COMMENT='设备状态';
env {job.mode="BATCH"job.name="device_performance"
}source {Jdbc {url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"result_table_name="switch_src"query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_switch_performance;"}Jdbc {url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"result_table_name="router_src"query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_router_performance;"}
}transform {Sql {source_table_name = "switch_src"result_table_name = "switch_dst"query = "SELECT  event_time , device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time  FROM switch_src;"}Sql {source_table_name = "router_src"result_table_name = "router_dst"query = "SELECT event_time, device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time FROM router_src;"}
}sink {Jdbc {url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"source_table_name = "switch_dst"query="INSERT INTO device_performance  VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"}Jdbc {url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"source_table_name = "router_dst"query="INSERT INTO device_performance  VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"}
}

./bin/seatunnel.sh --config ./syn_job/mysql2mysql_n1_batch.conf

4、多表 to 多表

多个source,多个sink

env {job.mode="BATCH"job.name="device_performance"
}source {Jdbc {url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"result_table_name="switch_src"query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_switch_performance;"}Jdbc {url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"result_table_name="router_src"query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_router_performance;"}
}transform {Sql {source_table_name = "switch_src"result_table_name = "switch_dst"query = "SELECT  event_time , device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time  FROM switch_src;"}Sql {source_table_name = "router_src"result_table_name = "router_dst"query = "SELECT event_time, device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time FROM router_src;"}
}sink {Jdbc {url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"source_table_name = "switch_dst"query="INSERT INTO device_performance_switch  VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"}Jdbc {url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"source_table_name = "router_dst"query="INSERT INTO device_performance_router  VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/457567.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SpringBoot配置文总结

官网配置手册 官网:https://spring.io/ 选择SpringBoot 选择LEARN 选择 Application Properties 配置MySQL数据库连接 针对Maven而言,会搜索出两个MySQL的连接驱动。 com.mysql mysql-connector-j 比较新,是在mysql mysql-connect…

vue3:24—组件通信方式

1、props 子组件也可以如下调用父组件的方法 2、自定义事件 (emit) 3、mitt(任意组件的通讯) 1. pubsub 2. $bus 3. mitt 接收数据的:提前绑定好事件(提前订阅消息)提供数据的:在合适的时候触发事件发布消息) 安装mitt npm i…

机试复习-3

前言:前面耽误太多时间,2月份是代码月,一定抓紧赶上,每天至少两道题 day1 2024.2.6 1.排序开启: 1.机试考试:排序应用考察 c的qsort c的sort 作用:对数组,vector排序&#…

项目经理怎么处理客户提出的不合理请求?

一、客户不合理请求的定义和特点 客户不合理请求是指客户在项目执行过程中提出的与项目需求、合同约定或者实际情况不符的要求,通常表现为追加要求、频繁的变更、过度的要求等。这些请求可能会导致项目范围膨胀、成本增加、工期延长、甚至影响项目进度和质量。客户…

前端基础复习(后端人员看前端知识)

企业级前端项目开发中,需要将前端开发所需要的工具、技术、流程、经验进行规范化和标准化,而不是零散的html、js、css文件堆叠在一起。 首先我们需配置前端的开发基础环境NodeJS,相当于后端人员java开发的JDK。然后搭建前端工程脚手架Vue-cl…

Golang GC 介绍

文章目录 0.前言1.发展史2.并发三色标记清除和混合写屏障2.1 三色标记2.2 并发标记问题2.3 屏障机制Dijkstra 插入写屏障Yuasa 删除写屏障混合写屏障 3.GC 过程4.GC 触发时机5.哪里记录了对象的三色状态?6.如何观察 GC?方式1:GODEBUGgctrace1…

LLaMA 模型中的Transformer架构变化

目录 1. 前置层归一化(Pre-normalization) 2. RMSNorm 归一化函数 3. SwiGLU 激活函数 4. 旋转位置嵌入(RoPE) 5. 注意力机制优化 6. Group Query Attention 7. 模型规模和训练超参数 8. 分布式模型训练 前置归一化与后置…

Linux的打包压缩与解压缩---tar、xz、zip、unzip

最近突然用到了许久不用的压缩解压缩命令,真的陌生, 哈哈,记录一下,后续就不用搜索了。 tar的打包 tar -cvf 压缩有的文件名称 需要压缩的文件或文件夹tar -cvf virtualbox.tar virtualbox/ tar -zcvf virtualbox.tar virtualbo…

【SpringBoot】application配置(5)

type-aliases-package: com.rabbiter.cm.domaintype-aliases-package: 这个配置用于指定mybatis的别名,别名是一个简化的方式,让你在Mapper xml 文件中引用java类型,而不需要使用使用完整的类名。例如,如果你在 com.rabbiter.cm.d…

MybatisPlus快速入门及常见设置

目录 一、快速入门 1.1 准备数据 1.2 创建SpringBoot工程 1.3 使用MP 1.4 获取Mapper进行测试 二、常用设置 2.1 设置表映射规则 2.1.1 单独设置 2.1.2 全局设置 2.2 设置主键生成策略 2.2.1 为什么会有雪花算法? 2.2.2 垂直分表 2.2.3 水平分表 2.…

Python进阶--下载想要的格言(基于格言网的Python爬虫程序)

注:由于上篇帖子(Python进阶--爬取下载人生格言(基于格言网的Python3爬虫)-CSDN博客)篇幅长度的限制,此篇帖子对上篇做一个拓展延伸。 目录 一、爬取格言网中想要的内容的url 1、找到想要的内容 2、抓包分析,找到想…

Verilog刷题笔记20

题目: Case statements in Verilog are nearly equivalent to a sequence of if-elseif-else that compares one expression to a list of others. Its syntax and functionality differs from the switch statement in C. 解题: module top_module ( …