使用SeaTunnel从InfluxDB同步数据到Doris

news/2024/12/29 13:04:16/文章来源:https://www.cnblogs.com/seatunnel/p/18529439

本文介绍了如何使用SeaTunnel将数据从InfluxDB同步到Doris。通过SeaTunnel强大的数据集成功能,用户可以高效地将存储于InfluxDB中的时间序列数据传输至Doris,便于数据的访问与分析。

版本信息:
SeaTunnel 2.3.3
InfluxDB 2.7.6
Doris 2.1.3 rc09

准备事项

SeaTunnel2.3.3的安装过程这里就省略了,可以参考官网文档。

SeaTunnel2.3.3安装好以后需要删掉两个连接用的jar包,不然后面同步数据库会报错:connector-hudi-2.3.3.jarconnector-datahub-2.3.3.jar.

需要增加的jar包:seatunnel-api-2.3.3.jarseatunnel-transforms-v2-2.3.3.jarmysql-connector-java-8.0.28.jarjersey-client-1.19.4.jar,这四个jar包必须添加,不然无法同步数据运行同步脚本直接报错没有某个类。

InfluxDB 2.7.6 需要做的前提事项:下面这个步骤必须要做,不然查不到数据,

InfluxDB Studio-0.2.0(这个客户端工具有个好处,可以查看字段类型,方便同步文件中的字段类型的定义,其他的客户端好像没有,也有可能是我没发现),可下载这个客户端进行连接查询数据。

Linux安装influxDB 2.7.6版本后,正常使用ip:8086 可访问influxdb UI,填写用户名、密码、org、buckets。

同步过程及踩坑点

SeaTunnel 2.3中集成InfluxDB配置用户名、密码后,执行同步任务总是报获取字段异常信息。

于是乎跟踪SeaTunnel代码,发现内部一直401权限认证失败。于是使用InfluxDB Studio数据库管理工具连接,输入ui页面相同的用户名,密码后一直报401权限认证不通过。通过查资料发现ui页面的用户名密码仅供ui页面使用,不能作为数据库本身访问的用户名密码。

使用iInfluxDB client客户端,查询权限influx v1 auth list结果为空。

使用命令分配权限
influx v1 auth create -o orgName --read-bucket bucketId --username=username
或者:influx v1 auth create -o "组织名称" --write-bucket bucketId(桶id,不需要引号) --read-bucket bucketId(桶id,不需要引号) --username=账号 --password=密码

删除命令:influx v1 auth delete --id 'id编码'

删除命令中的id编码为influx v1 auth list命令查出来的ID,下图所示:

file

命令执行完成后需输入两次密码。InfluxDB Studio数据库管理工具再次使用此用户名密码登录成功,SeaTunnel同步成功。

同步数据配置文件:v1.batch.config_tmp.template:

env {execution.parallelism = 1job.mode = "BATCH"checkpoint.interval = 10000
}source {influxdb {url = "http://X.X.X.X:8086"token = "写自己的token" #可有可无org = "自己的组织名称"bucket = "自己的桶" #可有可无database = "自己的桶"username = "写在第四步自己新建的influxdb账号"password = "写在第四步自己新建的influxdb密码"epoch = "H" #这个有好几级,可以去官网查看query_timeout_sec = 600measurement = "prometheus_remote_write" #数据表fields = ["node_cpu_seconds_total", "node_memory_MemTotal_bytes"] #可有可无,配置自己的字段sql = """SELECT node_cpu_seconds_total as system_cpu_usage,cpu as process_occupy_physical_memory_size,job as create_dept,node_memory_MemTotal_bytes as process_read_written_file_system_total_bytes,node_memory_MemAvailable_bytes as process_open_file_describe_quantity,time as create_time FROM "prometheus_remote_write" where time > now() - 1h"""where = " where time > now() - 1h" #经过本人测试。上面的sql查询的字段必须经过重命名,或者doris建表的字段必须和influxdb2的字段完全一致,不然transform 中进行转换的时候就会成为空值,这个我还没研究明白为什么,研究明白了在补上说明,doris的表字段类型也必须和influxdb2中查询的字段类型一致,不然数据存不到doris中。schema 重定义的事influxdb2查到的字段和类型schema {fields {#node_cpu_seconds_total = FLOATsystem_cpu_usage = FLOATprocess_occupy_physical_memory_size = INTcreate_dept = STRINGprocess_read_written_file_system_total_bytes = FLOATprocess_open_file_describe_quantity = FLOATcreate_time = BIGINT}}}
}sink {Doris {fenodes = "X.X.X.X:8030"username = "账号"password = "密码"table.identifier = "sbyw_data_acquisition.sbyw_application_process_type_tmp"sink.label-prefix = "test-cdc"sink.enable-2pc = "true"sink.enable-delete = "true"sink.max-retries = 3batch_size = 10000result_table_name = "sbyw_application_process_type_tmp"doris.config {format = "json"read_json_by_line = "true"}}
}transform {FieldMapper {source_table_name = "prometheus_remote_write"result_table_name = "sbyw_application_process_type_tmp"field_mapper = {#node_cpu_seconds_total = system_cpu_usagesystem_cpu_usage = system_cpu_usageprocess_occupy_physical_memory_size = process_occupy_physical_memory_sizeprocess_read_written_file_system_total_bytes = process_read_written_file_system_total_bytesprocess_open_file_describe_quantity = process_open_file_describe_quantitycreate_time = create_timecreate_dept = create_dept}}
}

写好同步数据脚本文件运行同步命令:./bin/seatunnel.sh -c ./config/v1.batch.config_tmp.template

下面是我Doris的测试表:

file

下面是InfluxDB Studio-0.2.0客户端查到 InfluxDB 2.7.6的数据:

file

InfluxDB 2.7.6有个坑点,它支持sql查询,但不完全支持,它只支持常规的简单查询,例如下图中的查询就可以查询,但是如下图所示,可能会有人说我后面没加group by,经过测试是不行的,即使加上group by也是无法执行,那是因为官方压根不支持的这种查询。

file

但是下图这样是可以的,InfluxDB 2官方就是这样设计的,聚合查询无法和单字段进行同步查询。

file

最后是运行结果:

file

同步到Doris的数据:

file

原文链接:https://blog.csdn.net/2401_84562349/article/details/140919192

本文由 白鲸开源 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/827530.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CMU_15445_P2_Extendible_Hash_Table

到Project2, 我们依然在处理数据库存储相关的部分, 从 Project1 中我们应该Get到两个概念:数据库底层数据操作的基本单元是 Page. buffer_pool_manager 是管理以及组织数据单元Page的工具, 在Project2的第一部分, 我们还新增了页面守护(PageGuard)的机制更加优雅的获取以及释放…

关于pacman更新时出现error: GPGME error: No data 解决方法

问题复现 基本上我隔一段时间就会出现这个问题,每一次都是在网络上寻找相关命令来解决,但是却不明白为什么会出现这个问题。 问题大概是这样的但是有一位博主详细的帮忙解答了问题,大概的意思是指:pacman 在更新数据库文件时会尝试下载每个仓库的 .db.sig 文件,这是数据库…

Zabbix7.0教程:新增Browser监控项

1 前言 Zabbix 7.0.0版本之后,增加了“Browser”监控项类型,即浏览器监控,能够使用浏览器监控复杂的网站和web应用。 浏览器监控项允许执行用户定义的JavaScript代码来模拟与浏览器相关的操作,例如点击、输入文本、网页导航等。该监控项通过HTTP/HTTPS收集数据,并部分…

ABB机器人维修示教器线缆常见的故障现象

在工业自动化领域,ABB机械手是一款广泛应用的设备,其示教器线缆的正常运行对于机械手的稳定工作至关重要。然而,在实际使用过程中,示教器线缆可能会出现各种故障,影响机械手的正常操作。本文将介绍ABB机器人示教器线缆常见的故障现象及相应的ABB机器人维修解决方案。 一、…

[GXYCTF2019]Ping Ping Ping 1 - Xxiaoma解题

打开后,发现显示一个 /?ip= 我们直接在url里添加/?ip=127.0.0.1,发现有回显,这时候就可以用分号;来进行命令链接执行了。 输入url/?ip=127.0.0.1;ls 回显出来两个php文件!直接构造payload:/?ip=127.0.0.1;cat /flag.php才发现没那么简单,回显过滤了空格,可以使用$IFS…

manim边学边做--三维的点和线

Manim 提供了一系列专为三维空间设计的对象,让创建三维数学动画变得更加轻松。 本篇开始介绍其中最简单的点和线相关对象,也就是Dot3D(三维的点),Line3D(三维的线)和Arrow3D(三维的箭头)。Dot3D用于表示三维空间中的点,是构建其他复杂三维图形的基础,它适用于标记关…

洛谷题单指南-二叉堆与树状数组-P2168 [NOI2015] 荷马史诗

原题链接:https://www.luogu.com.cn/problem/P2168 题意解读:把单次替换成k进制字符串,使得替换后文本内容最短,典型的哈夫曼编码应用。 解题思路: 要把单词转成k进制字符串,根据哈夫曼编码的原理,可以依次将k个出现次数最少的单词进行合并,最后得到一棵树,每个非叶节…

.Net Core NPOI 导出多级表头

想要导出这样的表格 数据准备格式附上源码1 using NPOI.HSSF.UserModel;2 using NPOI.SS.UserModel;3 using NPOI.SS.Util;4 using System.Data;5 using System.Text.RegularExpressions;6 7 namespace TestConsoleApp8 {9 /// <summary>10 /// 导出Excel11 …

Nacos原理汇总

今天就应某位小伙伴的要求,来讲一讲Nacos作为服务注册中心底层的实现原理不知你是否跟我一样,在使用Nacos时有以下几点疑问:临时实例和永久实例是什么?有什么区别? 服务实例是如何注册到服务端的? 服务实例和服务端之间是如何保活的? 服务订阅是如何实现的? 集群间数据…

人工智能模型训练中的数据之美——探索TFRecord

上一篇:《构建人工智能模型基础:TFDS和Keras的完美搭配》 序言:在人工智能模型的训练过程中,如何高效管理和处理大量数据是一个重要的课题。TensorFlow 的 TFRecord 格式为大规模数据存储和处理提供了一种灵活且高效的解决方案。在本节知识中,我们将介绍如何利用 TFRecord…

教师提前批试讲-注意事项

教师提前批试讲-注意事项

大白菜装系统

在平时工作中,作为程序员,最苦逼的是公司的电脑坏了,都找你。我想说我是程序员,不是修电脑的,但是架不住小姐姐的热情,还是做了。 装系统流程: 第一步:前期准备 1、使用【大白菜】制作U盘启动盘。 2、查询机型的U盘启动快捷键。 3、准备一个ISO/GHO镜像。 第二步:插入…