异源数据同步 → DataX 同步启动后如何手动终止?

news/2025/1/9 15:19:29/文章来源:https://www.cnblogs.com/youzhibing/p/18524122

开心一刻

刚刚和老婆吵架,气到不行,想离婚
女儿突然站出来劝解道:难道你们就不能打一顿孩子消消气,非要闹离婚吗?
我和老婆同时看向女儿,各自挽起了衣袖
女儿补充道:弟弟那么小,打他,他又不会记仇

开心一刻

需求背景

项目基于 DataX 来实现异源之间的数据离线同步,我对 Datax 进行了一些梳理与改造

异构数据源同步之数据同步 → datax 改造,有点意思
异构数据源同步之数据同步 → datax 再改造,开始触及源码
异构数据源同步之数据同步 → DataX 使用细节
异构数据源数据同步 → 从源码分析 DataX 敏感信息的加解密
异源数据同步 → DataX 为什么要支持 kafka?
异源数据同步 → 如何获取 DataX 已同步数据量?

本以为离线同步告一段落,不会再有新的需求,可打脸来的非常快,产品经理很快找到我,说了如下一段话

昨天我在测试开发环境试用了一下离线同步功能,很好的实现了我提的需求,给你点赞!
但是使用过程中我遇到个情况,有张的表的数据量很大,一开始我没关注其数据量,所以配置了全量同步,启动同步后迟迟没有同步完成,我才意识到表的数据量非常大,一查才知道 2 亿多条数据,我想终止同步却发现没有地方可以进行终止操作
所以需要加个功能:同步中的任务可以进行终止操作

这话术算是被产品经理给玩明白了,先对我进行肯定,然后指出使用中的痛点,针对该痛点提出新的功能,让我一点反驳的余地都没有;作为一个讲道理的开发人员,面对一个很合理的需求,我们还是很乐意接受的,你们说是不是?

需求一接,问题就来了

如何终止同步

思考这个问题之前,我们先来回顾下 DataX 的启动;还记得我们是怎么集成 DataX 的吗,异构数据源同步之数据同步 → datax 再改造,开始触及源码 中有说明,新增 qsl-datax-hook 模块,该模块中通过命令

Process process = Runtime.getRuntime().exec(realCommand);
realCommand 就是启动 DataX 的 java 命令,类似

java -server -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -Ddatax.home=/datax -classpath /datax/lib/* com.alibaba.datax.core.Engine -mode standalone -jobid -1 -job job.json

来启动 DataX,也就是给 DataX 单独启动一个 java 进程;那么如何停止 DataX,思路是不是就有了?问题是不是就转换成了

如何终止 java 进程

终止进程

如何终止进程,这个我相信你们都会

Linux:kill -9 pid
Win:cmd.exe /c taskkill /PID pid /F /T

但这有个前提,需要知道 DataX 的 java 进程的 pid,而 JDK8 中 Process 的方法如下

Process方法

是没有提供获取 pid 的方法,在不调整 JDK 版本的情况下,我们如何获取 DataX 进程的 pid?不同的操作系统获取方式不一样,我们分别对 LinuxWin 进行实现

  1. Linux

    实现就比较简单了,仅仅基于 JDK 就可以实现

    Field field = process.getClass().getDeclaredField("pid");
    field.setAccessible(true);
    int pid = field.getInt(process);
    

    通过反射获取 process 实现类的成员变量 pid 的值;这段代码,你们应该都能看懂吧

  2. Win

    Win 系统下,则需要依赖第三方工具 oshi

    <dependency><groupId>com.github.oshi</groupId><artifactId>oshi-core</artifactId><version>6.6.5</version>
    </dependency>
    

    获取 pid 实现如下

    Field field = process.getClass().getDeclaredField("handle");
    field.setAccessible(true);
    long handle = field.getLong(process);
    WinNT.HANDLE winntHandle = new WinNT.HANDLE();
    winntHandle.setPointer(Pointer.createConstant(handle));
    int pid = Kernel32.INSTANCE.GetProcessId(winntHandle);
    

    同样用到了反射,还用到了 oshi 提供的方法

合并起来即得到获取 pid 的方法

/*** 获取进程ID* @param process 进程* @return 进程id,-1表示获取失败* @author 青石路*/
public static int getProcessId(Process process) {int pid = NULL_PROCESS_ID;Field field;if (Platform.isWindows()) {try {field = process.getClass().getDeclaredField("handle");field.setAccessible(true);long handle = field.getLong(process);WinNT.HANDLE winntHandle = new WinNT.HANDLE();winntHandle.setPointer(Pointer.createConstant(handle));pid = Kernel32.INSTANCE.GetProcessId(winntHandle);} catch (Exception e) {LOGGER.error("获取进程id失败,异常信息:", e);}} else if (Platform.isLinux() || Platform.isAIX()) {try {field = process.getClass().getDeclaredField("pid");field.setAccessible(true);pid = field.getInt(process);} catch (Exception e) {LOGGER.error("获取进程id失败,异常信息:", e);}}LOGGER.info("进程id={}", pid);return pid;
}

得到的 pid 是不是正确的,我们是不是得验证一下?写个 mainClass

/*** mainClass* @author 青石路*/
public class HookMain {public static void main(String[] args) throws Exception {String command = "";if (Platform.isWindows()) {command = "ping -n 1000 localhost";} else if (Platform.isLinux() || Platform.isAIX()) {command = "ping -c 1000 localhost";}Process process = Runtime.getRuntime().exec(command);int processId = ProcessUtil.getProcessId(process);System.out.println("ping 进程id = " + processId);new Thread(() -> {try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream(), System.getProperty("sun.jnu.encoding")))) {String line;while ((line = reader.readLine()) != null) {System.out.println(line);}} catch (IOException e) {throw new RuntimeException(e);}}).start();}
}

利用 maven 打包成可执行 jar 包

<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-jar-plugin</artifactId><configuration><archive><manifest><addClasspath>true</addClasspath><classpathPrefix>lib/</classpathPrefix><mainClass>com.qsl.hook.HookMain</mainClass></manifest></archive></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-dependency-plugin</artifactId><executions><execution><id>copy-dependencies</id><phase>package</phase><goals><goal>copy-dependencies</goal></goals><configuration><outputDirectory>target/lib</outputDirectory></configuration></execution></executions></plugin></plugins>
</build>

然后执行 jar

java -jar qsl-datax-hook-0.0.1-SNAPSHOT.jar

我们来看下输出结果

  1. Linux

    jar 输出日志如下

    Linux_输出

    我们 ps 下进程

    ps -ef|grep ping
    
    Linux_验证
  2. Win

    jar 输出日志如下

    win_输出

    我们再看下任务管理器的 ping 进程

    win_验证

可以看出,不管是 Linux 还是 Win,得到的 pid 都是正确的;得到 pid 后,终止进程就简单了

/*** 终止进程* @param pid 进程的PID* @return true:成功,false:失败*/
public static boolean killProcessByPid(int pid) {if (NULL_PROCESS_ID == pid) {LOGGER.error("pid[{}]异常", pid);return false;}String command = "kill -9 " + pid;boolean result;if (Platform.isWindows()) {command = "cmd.exe /c taskkill /PID " + pid + " /F /T ";}Process process  = null;try {process = Runtime.getRuntime().exec(command);} catch (IOException e) {LOGGER.error("终止进程[pid={}]异常:", pid, e);return false;}try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8))) {//杀掉进程String line;while ((line = reader.readLine()) != null) {LOGGER.info(line);}result = true;} catch (Exception e) {LOGGER.error("终止进程[pid={}]异常:", pid, e);result = false;} finally {if (!Objects.isNull(process)) {process.destroy();}}return result;
}

完整流程应该是

  1. 使用 Runtime.getRuntime().exec(java命令) 启动 DataX,并获取到 Process

    java 命令指的是启动 DataX 的 java 命令,例如

    java -server -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -Ddatax.home=/datax -classpath /datax/lib/* com.alibaba.datax.core.Engine -mode standalone -jobid -1 -job job.json
    
  2. 通过 ProcessUtil#getProcessId 获取 Process 的 pid,并与同步任务信息绑定进行持久化

    通过任务id 可以查询到对应的 pid

  3. 触发任务 终止,通过任务id找到对应的 pid,通过 ProcessUtil#killProcessByPid 终止进程

    终止了进程也就终止了同步任务

如果 qsl-datax-hook 是单节点,上述处理方案是没有问题的,但生产环境下,qsl-datax-hook 不可能是单节点,肯定是集群部署,那么上述方案就行不通了,为什么呢?我举个例子

假设 qsl-datax-hook 有 2 个节点:A、B,在 A 节点上启动 DataX 同步任务(taskId = 666)并得到对应的 pid = 1488,终止任务 666 的请求被负载均衡到 B 节点,会发生什么情况

  1. B 节点上没有 pid = 1488 进程,那么终止失败,A、B 节点都不受影响
  2. B 节点上有 pid = 1488 进程,这个进程可能是 DataX 同步任务进程,也可能是其他进程,那么这个终止操作就会产生可轻可重的故障了!

然而需要终止的同步任务却还在 A 节点上安然无恙的执行着

所以集群模式下,我们不仅需要将 pid 与任务进行绑定,还需要将任务执行的节点信息也绑定进来,节点信息可以是 节点ID,也可以是 节点IP,只要能唯一标识节点就行;具体实现方案,需要结合具体的负载均衡组件来做设计,由负载均衡组件将任务终止请求分发到正确的节点上,而不能采用常规的负载均衡策略进行分发了;因为负载均衡组件很多,所以实现方案没法统一设计,需要你们结合自己的项目去实现,我相信对你们来说很简单

你懂我意思吧_懂

总结

  1. 任务的启动方式不同,终止方式也会有所不同,如何优雅的终止,是我们需要考虑的重点
  2. 直接杀进程的方式,简单粗暴,但不够优雅,一旦错杀,问题可大可小,如果有其他方式,不建议选择该方式
  3. 适用单节点的终止方式不一定适用于集群,大家设计方案的时候一定要做全方位的考虑
  4. 示例代码:qsl-datax-hook

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/827007.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

开源的 API 学习平台「GitHub 热点速览」

前有 5 万颗星标的开源项目 HTTPie 因误操作导致 Star 清零(2022 年),上周知名开源项目 Elasticsearch 也经历了 Star 一夜清零的事件。这些事故的原因均是管理员误将开源项目从公开状态转为私有状态所导致。为避免类似事件再次发生,GitHub 已在转为私有的功能处增加了两次…

一文夯实垃圾收集的理论基础

如何判断一个引用是否存活 引用计数法 给对象中添加一个引用计数器,每当有一个地方引用它,计数器就加 1;当引用失效,计数器就减 1;任何时候计数器为 0 的对象就是不可能再被使用的。 优点:可即刻回收垃圾,当对象计数为0时,会立刻回收; 弊端:循环引用时,两个对象的计…

c++实现livox-mid70/360采集、保存点云数据

c++实现livox-mid70/360采集、保存点云数据void PointCloudCallback(uint32_t handle, const uint8_t dev_type, LivoxLidarEthernetPacket* data, void* client_data) {if (data == nullptr) {return;}if (data->data_type == kLivoxLidarCartesianCoordinateHighData) {Li…

Odoo 连接ldap 域认证

附一个验证ldap的python代码。import socket from ldap3 import Server, Connection, ALLdef is_port_open(host, port):"""检查远程主机的指定端口是否开放:param host: 远程主机地址:param port: 要检查的端口号:return: 端口开放返回True,否则返回False&quo…

记从 dotnet framework 4.8 升级到 4.8.1 时运行的 dotnet remoting 程序出现空异常

本文记录一个奇怪的坑,某台用户设备从 .NET Framework 4.8 更新到 .NET Framework 4.8.1 时,所运行的 .NET Remoting 程序出现了奇怪的空异常。且重启之后不复现错误堆栈如下 System.NullReferenceException:“Object reference not set to an instance of an object.”在 Sy…

读取麒麟系统的各项版本信息

本文将记录读取麒麟系统的各项版本信息系统通用 $ uname -a Linux lindexi-pc 5.4.18-116-generic #105-KYLINOS SMP Fri Jun 21 14:09:22 UTC 2024 loongarch64 loongarch64 loongarch64 GNU/Linux$ uname -r 5.4.18-116-generic$ cat /etc/os-release NAME="Kylin"…

读数据工程之道:设计和构建健壮的数据系统28数据服务常见关注点

数据服务的常见关注点1. 使用场景 1.1. 为分析和BI,也就是统计分析、报表和仪表板提供数据服务1.1.1. 是数据服务最为常见的目标1.1.2. 这些概念的提出早于IT和数据库,但是它们对于了解业务、组织和财务流程的利益相关者来说仍然至关重要1…

RHEL9.4安装knock配置ssh

日期:2024.11.3 目的:RHEL9这台物理机打算实现两种登录方式,root只能基于key认证登录;另外一个账户可以用账号密码登录,但是登录端口不开放,通过安装knock server,顺序敲击预设的端口,才开这个能够用账号密码登录的端口。 参照:鸟哥Linux私房菜 https://linux.vbird.o…

基于Chappie-II的二次开发日志-2

紧接着上一次没做完的继续,完善settings页面功能,实现过程中顺便完善了WiFi连接和时间同步。紧接着上一次没做完的继续,完善settings页面功能,实现过程中顺便完善了WiFi连接和时间同步。 本文主要包括:Settings菜单完善多级菜单和sidebar lvgl表格 文本和滚动Esp32的smart…

给 Ollama 穿上 GPT 的外衣

上一篇我们介绍了如何在本地部署 ollama 运行 llama3 大模型。过程是相当简单的。但是现在给大模型交流只能在命令行窗口进行。这样的话就只能你自己玩了。独乐乐不如众乐乐嘛。我们接下来说一下如何部署 open-webui 给 ollama 加一个 webui,这样用户就可以通过浏览器访问我们…

18. 使用MySQL之全文本搜索

1. 理解全文本搜索 注意:并非所有引擎都支持全文本搜索: 正如第21章所述,MySQL支持几种基本的数据库引擎。并非所有的引擎都支持这里所描述的全文本搜索。 两个最常使用的引擎为MyISAM和InnoDB,前者支持全文本搜索,而后者不支持。这就是为什么虽然本书中创建的多数样例表使…

深入 Pod

kubectl 操作 Pod 的命令,探针以及 Pod 的生命周期。Author: ACatSmiling Since: 2024-11-05kubectl 命令 创建 Pod 方式一:使用配置文件创建。首先,需要创建一个 Pod 的配置文件(通常是.yaml或.yml格式)。例如,创建一个简单的 Nginx Pod 配置文件 nginx-pod.yaml: apiV…