异构数据源同步之数据同步 → datax 改造,有点意思

news/2024/12/30 2:39:04/文章来源:https://www.cnblogs.com/youzhibing/p/18200810

开心一刻

去年在抖音里谈了个少妇,骗了我 9 万

后来我发现了,她怕我报警

她把她表妹介绍给我

然后她表妹又骗了我 7 万

DataX

DataX 是什么,有什么用,怎么用

不做介绍,大家自行去官网(DataX)看,Gitee 上也有(DataX)

不服气吗

你们别不服,我这是为了逼迫你们去自学,是为了你们好!

文档很详细,也是开源的,我相信你们都能看懂,也能很快上手用起来

那这篇文章到此结束,大家各自去忙吧

但是等等,我想带你们去改造改造datax

挺有意思的,我们慢慢往下看

去 Python

根据官方的 Quick Start

系统要求

是依赖 Python 来启动的

$ cd  {YOUR_DATAX_HOME}/bin
$ python datax.py {YOUR_JOB.json}

如果要去掉 Python 依赖,你们会怎么做?

是不是梳理清楚 datax.py 的代码逻辑就行了?

datax.py

这个代码不长,但是如果没有一点 Python 底子,datax.py 是看不懂的

所以我们换个方式,去寻找我们需要的信息就行了

DataX 的业务代码是 java 实现的,然后你们再往上看看 System Requirements

你们觉得该如何启动 JVM 进程来执行 DataXjava 代码?

是不是只能用 JDKjava 命令了?

所以我们直接在 datax.py 中搜索 java 即可

你们会发现只有如下这一行表示 java 命令

ENGINE_COMMAND = "java -server ${jvm} %s -classpath %s  ${params} com.alibaba.datax.core.Engine -mode ${mode} -jobid ${jobid} -job ${job}" % (DEFAULT_PROPERTY_CONF, CLASS_PATH)

Python 中的 % 就相当于 java 中的 String.format 方法

python %

也就说,datax.py 是通过 java -server 命令来启动 JVM 进程的

那么我们是不是可以绕过 Python,直接在 cmd 调用 java -server 来启动了?

java -server

这个命令还真不眼熟,因为我们接触到的往往是 java -jar

我们用 java -h 看下 java 命令的说明

java -h

发现了什么?

-serveroption 之一,与 -jar 并不是 非此即彼 的关系

所以不要去拿 java -serverjava -jar 做对比了,没意义!!!

在Java中,JVM有两种运行模式:客户端模式和服务器模式。这两种模式是为了优化不同场景下的JVM性能而设计的。服务器模式:这种模式适用于长时间运行的应用程序,如Web服务器或数据库服务器。服务器模式下的JVM会进行更多的优化,以减少长时间运行的性能开销。例如,它会进行更深入的即时编译(JIT compilation),以提高代码的执行效率。客户端模式:默认情况下,JVM运行在客户端模式。这种模式适用于较短时间运行的应用程序,如桌面应用或命令行工具。客户端模式下的JVM会更快地启动,但可能不如服务器模式那样高效。使用-server选项启动JVM时,您告诉JVM在服务器模式下运行。这通常意味着JVM将使用更多的系统资源,但可以提供更好的性能,特别是在长时间运行的应用程序中

我们先下载 DataX 工具包

datax工具包

解压之后,我的 DataX 的根目录是:G:\datax-tool\datax

datax工具home目录

我们不通过 datax.py 来启动,而是直接在 cmd 下通过 java 命令来启动

java -server -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=G:\datax-tool\datax\log -Dfile.encoding=GBK -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=G:\datax-tool\datax -Dlogback.configurationFile=G:\datax-tool\datax\conf\logback.xml -classpath G:\datax-tool\datax\lib\* com.alibaba.datax.core.Engine -mode standalone -jobid -1 -job G:\datax-tool\datax\job\job.json

注意:上述 java 命令中的相关路径需要替换成你们自己的路径!

不出意外的话,会执行成功

java命令启动

为什么依赖 Python

如果你们去看了 DataX 工具包的目录结构,或者 DataX 的源码

你们会发现 DataX 就是用 java 实现的,Python 仅仅只是作为一个启动脚本(另外两个脚本你们自己去研究)

启动脚本

仅仅为了一个启动,而这个启动又不是非 Python 不可,就引入了 Python 环境依赖,试问这合理吗?

不合理

不要急着下结论,我们理智分析一波

DataX 正式投入使用的时候,会部署到什么系统上,请你们大声的告诉我

linux

不说全部,绝大部分是部署在 Linux 上,对此我相信你们都没异议吧

那么重点来了:目前主流的 Linux 系统,都自带 Python !!!

也就是不用再额外的是安装 Python,直接可以用,那为什么不用呢?

那如果是部署在 Windows 上,而又不想安装 Python,该如何启动了?

如果你们还能问出这样的问题,我只想给你们来上一枪

老子一枪崩了你

前面不是刚讲吗,在 cmd 直接用 java 命令来启动 DataX 不就行了?

java 启动 DataX

说的更详细点,是通过 java 代码去启动 DataX JVM 进程

我相信你们都会,直接上代码

private static final String DATAX_COMMAND = "java -server -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=G:\\datax-tool\\datax\\log -Dfile.encoding=GBK -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=G:\\datax-tool\\datax -Dlogback.configurationFile=G:\\datax-tool\\datax\\conf\\logback.xml -classpath G:\\datax-tool\\datax\\lib\\* com.alibaba.datax.core.Engine -mode standalone -jobid -1 -job G:\\datax-tool\\datax\\job\\job.json";public static void main(String[] args) {try {Process process = Runtime.getRuntime().exec(DATAX_COMMAND);// 等待命令执行完成int i = process.waitFor();if (i == 0) {System.out.println("job执行完成");} else {System.out.println("job执行失败");}} catch (Exception e) {throw new RuntimeException(e);}
}

是不是很简单?

执行下,你会发现卡住了!!!

卡住

出师不利呀,要不放弃?

放弃

Runtime 对象调用 exec(cmd) 后,JVM 会启动一个子进程,该进程会与 JVM 进程建立三个管道连接:标准输入标准输出标准错误流

假设子进程不断在向标准输出流和标准错误流写数据,而 JVM 进程不读取的话,当缓冲区满之后将无法继续写入数据,最终造成阻塞在 waitfor()

所以改造下就好了

private static final String SYSTEM_ENCODING = System.getProperty("sun.jnu.encoding");
private static final String DATAX_COMMAND = "java -server -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=G:\\datax-tool\\datax\\log -Dfile.encoding=GBK -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=G:\\datax-tool\\datax -Dlogback.configurationFile=G:\\datax-tool\\datax\\conf\\logback.xml -classpath G:\\datax-tool\\datax\\lib\\* com.alibaba.datax.core.Engine -mode standalone -jobid -1 -job G:\\datax-tool\\datax\\job\\job.json";public static void main(String[] args) {try {Process process = Runtime.getRuntime().exec(DATAX_COMMAND);// 另启线程读取new Thread(() -> {try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream(), SYSTEM_ENCODING))) {String line;while ((line = reader.readLine()) != null) {System.out.println(line);}} catch (IOException e) {throw new RuntimeException(e);}}).start();new Thread(() -> {try (BufferedReader errorReader = new BufferedReader(new InputStreamReader(process.getErrorStream(), SYSTEM_ENCODING))) {String line;while ((line = errorReader.readLine()) != null) {System.out.println(line);}} catch (IOException e) {throw new RuntimeException(e);}}).start();// 等待命令执行完成int i = process.waitFor();if (i == 0) {System.out.println("job执行完成");} else {System.out.println("job执行失败");}} catch (Exception e) {throw new RuntimeException(e);}
}

还是比较简单的吧,相信你们都能看懂

总结

  • DataX 是进程级别的,而 Job 下的 Task 是线程级别的

    image-20240519201433065

    为什么 DataX 要实现成进程级别,而不是线程级别?

    小数据量的同步,实现方式往往很多

    但大数据量的同步,情况就不一样了,那么此时进程和线程的区别还大吗

  • Linux 系统基本自带 Python 环境,所以大家不要再纠结为什么依赖 Python

    去掉 Python 依赖也很简单,文中已有演示

  • DataX + datax-web 这个组合已经基本够用

    datax-web 基于 XXL-JOB,基本满足我们日常的调度要求了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/709073.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

sdf 测试-2-openssl

在openEuler(推荐)或Ubuntu或Windows(不推荐)中完成下面任务,参考网内容 和AI要给出详细过程,否则不得分。 0. 根据gmt0018标准,如何调用接口实现基于SM3求你的学号姓名的SM3值?(5‘)使用OpenSSL实现SDF接口中的hash运算接口,至少支持SM3算法,把相关函数集成到src中的…

阅读习惯2

本学期阅读成果总结阅读书单选择与总结: 我选择了参考的书单链接(https://weread.qq.com/misc/booklist/3107758_7sb8Fs2Hv),并从中挑选了几本书开始阅读。阅读数据截图:总时长:250 小时 册数:15 本 笔记数:47 条本学期的收获:阅读时长增加:本学期我总共阅读了250小时…

lodash已死?radash库方法介绍及源码解析 —— 函数柯里化 + Number篇

点赞 + 收藏 = 学会! 本篇我们介绍radash中函数柯里化和Number 相关的方法使用和源码解析。深入学习radash中的方法思想和底层实现。写在前面 tips:点赞 + 收藏 = 学会!主页有更多其他篇章的方法,欢迎访问查看。 本篇我们继续介绍radash中函数柯里化和Number 相关的方法使用…

sdf 测试-1-龙脉智能钥匙

在openEuler(推荐)或Ubuntu或Windows(不推荐)中完成下面任务,参考网内容 和AI要给出详细过程,否则不得分。 0. 根据gmt0018标准,推导sdf的接口调用模式,比如调用SDF_GenerateRandom,还应调用其他什么函数,调用顺序是什么,给出结论和推导过程。(10‘)使用龙脉智能钥匙…

sdf 测试-1

在openEuler(推荐)或Ubuntu或Windows(不推荐)中完成下面任务,参考网内容 和AI要给出详细过程,否则不得分。根据gmt0018标准,推导sdf的接口调用模式,比如调用SDF_GenerateRandom,还应调用其他什么函数,调用顺序是什么,给出结论和推导过程。(10‘) 使用龙脉智能钥匙定义一…

Spring 对于事务上的应用的详细说明

1. Spring 对于事务上的应用的详细说明 @目录1. Spring 对于事务上的应用的详细说明每博一文案2. 事务概述3. 引入事务场景3.1 第一步:准备数据库表3.2 第二步:创建包结构3.3 第三步:准备对应数据库映射的 Bean 类3.4 第四步:编写持久层3.5 第五步:编写业务层3.6 第六步:…

日常Bug排查-偶发性读数据不一致

日常Bug排查-偶发性读数据不一致 前言 日常Bug排查系列都是一些简单Bug的排查。笔者将在这里介绍一些排查Bug的简单技巧,同时顺便积累素材。 Bug现场 业务场景 先描述这个问题出现的业务场景。这是一个支付的场景,如果支付成功了,我们就把支付状态置为success(主单据更新)同…

多线程和多进程 - 初窥

一、说明 在平常工作中,我们使用top命令查看一台linux服务器的cpu使用情况时,会发现某个进程的cpu使用率会超过100%,这是为什么? 二、举例 实验环境为 CentOS7.6 + Python2.7 1. 多线程、多进程在操作系统中的表现形式 我们首先看两个例子,test1.py和test2.py,都是执行死…

EDP .Net开发框架--权限

EDP是一套集组织架构,权限框架【功能权限,操作权限,数据访问权限,WebApi权限】,自动化日志,动态Interface,WebApi管理等基础功能于一体的,基于.net的企业应用开发框架。通过友好的编码方式实现数据行、列权限的管控。平台下载地址:https://gitee.com/alwaysinsist/edp…

两台数据库在数据写入时性能的差异

介绍:我有两台数据库,分别称为200和203,200和203的服务器性能配置相当,203的配置甚至还要好一点。都是安装的centos7.7,oracle 19C,均已开日志归档,这两台服务器在同一个机房,同一个网段。当我在本地使用JDBC去往这两个数据库分别插入10w条记录,每插入一条提交一次,2…

【一步步开发AI运动小程序】十七、如何识别用户上传视频中的运动、动作、姿态?

【云智AI运动识别小程序插件】,可以为您的小程序,赋于人体检测识别、运动检测识别、姿态识别检测AI能力。本地原生识别引擎,内置10余个运动,无需依赖任何后台或第三方服务,有着识别速度快、体验佳、扩展性强、集成快、成本低的特点,本篇实现需要使用此插件,请先行在微信…