如何使用 DataX 连接 Easysearch

news/2025/2/12 12:03:41/文章来源:https://www.cnblogs.com/infinilabs/p/18706285

DataX

DataX 是阿里开源的一款离线数据同步工具,致力于实现包括关系型数据库(MySQL、Oracle 等)、HDFS、Hive、ODPS、HBase、FTP 等各种异构数据源之间稳定高效的数据同步功能。

本篇主要介绍 DataX 如何将数据写入到 Easysearch,对于各种数据源的连接不会做深入的探讨,感兴趣的小伙伴可以访问 DataX 的 Github 仓库查看详情。

下载与安装

DataX 无需安装,下载后解压即可使用。

系统需求:

  1. JDK 1.8 及以上
  2. Python2 或 3

创建任务配置文件

每个数据同步的操作可称为一个任务,任务的配置文件定义了数据源(reader)、数据目的(writer) ,以及任务的设置信息,如并发数、速度控制等。DataX 集成了如此多的数据源,如果靠纯手工编写任务配置显然不现实。官方也出了个命令可以根据指定的数据源和数据目的帮助大家生成任务配置。

python datax.py -r {YOUR_READER} -w {YOUR_WRITER}

测试配置文件

此次演示使用 streamreader 和 elasticsearchwriter 作为数据源和数据目的,任务配置如下:

{"job": {"content": [{"reader": {"name": "streamreader","parameter": {"sliceRecordCount": 10000,"column": [{"type": "long","value": "10"},{"type": "string","value": "hello,你好,世界-DataX"},{"type": "string","value": "hello,你好,Easysearch"}]}},"writer": {"name": "elasticsearchwriter","parameter": {"endpoint": "http://localhost:9200","accessId": "admin","accessKey": "1ef0c661d8562aaa06be","index": "yf-test","column": [{ "name": "no", "type": "long" },{ "name": "content", "type": "keyword" },{ "name": "content2", "type": "keyword" }]}}}],"setting": {"speed": {"channel": 50}}}
}

streamreader 是一个从内存读取数据的插件, 它主要用来快速生成期望的数据并对写入插件进行测试。

我们用 streamreader 构造了 10000 个文档,文档含三个字段,任务启动了 50 个 channel 进行数据发送,结果就是共计发送 50w 个文档。

elasticssearchwriter 指定了 Easysearch 的连接信息:

  • endpoint: Easysearch 的地址和端口
  • accessId: 用户名
  • accessKey: 密码
  • index: 写入索引名
  • column: 对 reader 发来数据的 schema 定义
  • batchsize: 默认 1000

这次我们 Easysearch 开启的 http 服务,因为 DataX 的 elasticsearchwriter 无法跳过证书验证。对于必须使用 https 的场景,可使用 INFINI Gateway 代理 ES 服务,提供 http 通道给离线数据同步专用。

⚠️ 注意:

不同的 reader、writer 对 sliceRecordCount 和 channel 会有不同的行为。

Easysearch

本次测试使用的 Easysearch 版本是 1.9.0,需要注意是 Easysearch 要开启兼容性参数:

elasticsearch.api_compatibility: true

否则创建索引报错退出。(实际索引创建成功了但是 mapping 信息是空的)

运行任务

编辑好任务配置文件后,下一步就是执行任务。

python3 datax.py yf-test.json

写入数据时索引不存在,Datax 根据 schema 定义创建了索引。

OK 任务执行完毕,写入 50w 个文档耗时 10 秒。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/881286.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

计算机组成原理图解趣味版《程序是怎样跑起来的(第3版)》PDF、EPUB免费下载

计算机组成原理图解趣味版,蹲马桶便能看懂的计算机和编程原理入门知识,网络是怎样连接的,计算机是怎样跑起来的姊妹篇,日文版重印41次 单册数量100本以上。 本书从计算机的内部结构开始讲起,以图配文的形式详细讲解了二进制、内存、数据压缩、源文件和可执行文件、操作系统…

自然语言处理与词嵌入

自然语言处理与词嵌入传统的词汇使用词汇表(Vocabulary)来存储,并用one-hot向量表示,向量长度等于词汇表大小,每个单词对应一个独特的索引,只有索引处的值为1,其余全部为0。如果 “max” 在词汇表里的索引是5391,那么对应的one-hot向量为 \(O_{5391}\)。 但是这种表示方法…

[20052006-ptz] Decoding Martian Messages

前言 有点神奇的一个题, 不太会做 思路给定 \(p_{i, c}\) 表示位置 \(i\) 是字符 \(c\) 的概率, 确定 \(\displaystyle\sum_{c = 1}^{t} p_{i, c} = 1\) 一个有效的信息被定义为任意长度为 \(k\) 的子序列都在集合 \(\mathbb{D}\) 中出现 求一个有效的信息 \(c_1c_2c_3c_4\cdot…

2.3 ~ 2.11

长达九天的寒假集训2.3 上午 模拟 欢乐赛。 ACM 赛制,10 道题; 开题。发现 A 是水,直接通过。 看 B,发现 B 就是个背包,爆了两发之后过了。 看 jijidawang 过了 G 就去看 G,发现是线段树,写了一发,没过。 发现了一些问题,改了改,没过。 开始写拍,拍出来发现自己 que…

Joker 智能开发平台再放大招,新作将彻底重塑开发模式

—— 突破传统枷锁,引领开发模式全面革新自前端可视化智能平台重磅发布后,其在行业内的影响力便如涟漪般迅速扩散。凭借着创新的设计理念和过硬的性能表现,这个平台为无数开发者和企业提供了高效且便捷的开发解决方案,收获了满满的赞誉与认可。 如今,Joker 智能开发平台又…

docker/docker-compose下存储卷的使用(CIFS和NFS)

前面说到Linux下的挂载CIFS和NFS,这里就顺道简单记一下docker/docker-compose去挂载CIFS和NFS的方式,当个笔记记录一下。docker去挂载CIFS和NFS,需要使用存储卷,稍微懂一点docker,就知道这个是什么,所以这里我们长话短说。首先,docker存储卷分为管理卷、绑定卷、临时卷。…

Deep深度算命App

Deep深度算命App应用背景与开发动机 最近,DeepSeek R1模型在AI领域引起了广泛的关注。尽管它不是当前最强大的模型之一,但在中文处理方面表现出色。春节期间,我原本计划开发一个基于AI的图像识别程序,但被周围人对DeepSeek R1用于算命应用的高度评价所吸引,尤其是我的小舅…

javaj进阶(中)

java进阶(中) 集合集合的特点可以动态保存任意多个对象,使用比较方便 提供了一系列方便的操作对象的方法:add、remove、set、getjava集合类很多,主要分为两大类,如图LInk(接口) ArrayList(Link实现类)ArrayList 是 Java 中最常用的集合类之一,它实现了 List 接口,基于…

java中awit和sleep的区别和线程安全性问题

awit和sleep的区别 从名称上来讲: awit:等待。 sleep:休眠。 从属关系上来讲: awit:awit这个方法是在对象上,只要是对象,就有这个方法 sleep:sleep是在Thread上,它是在线程上,是一个静态方法使用方式上来讲: awit只能够在同步代码中去使用 sleep可以在任意的地方中去使用从…

[PyTorch] DDP源码阅读

[PyTorch] DDP源码阅读PyTorch的DistributedDataParallel (DDP) 允许多台机器,多台GPU之间的数据并行。本文简单讲解DDP的流程,并从代码层面理解DDP如何访问底层的通信框架。DDP使用单机多进程来控制多个GPU。模型需要能放入单个GPU中。参考了PyTorch 源码解读之 DP & DD…

SQL注入之时间盲注

SQL注入之时间盲注 一、时间盲注原理 时间盲注技术的核心在于巧妙地运用数据库中的时间延迟函数(例如 MySQL 的 SLEEP() 函数或 PostgreSQL 的 PG_SLEEP() 函数)来验证注入条件的有效性。当注入条件成立时,数据库会执行这些延迟函数,从而导致页面响应时间显著增加;反之,若…