Elasticsearch:使用 MongoDB connector 同步数据到 Elasticsearch

MongoDB 是一个基于分布式文件存储的数据库。由 C++ 语言编写。旨在为 WEB 应用提供可扩展的高性能数据存储解决方案。MongoDB 是一个介于关系数据库和非关系数据库之间的产品,是非关系数据库当中功能最丰富,最像关系数据库的。Elasticsearch 是一个高效强大的大数据搜索引擎。它的 Speed, Scale 及 Relevance 是很多数据库不具有的。通过 Elasticsearch 的高效搜索引擎,我们可以快速地搜索相关的内容,我们也可以使用 Kibana 所提供的可视化为数据提供洞察。在今天的文章中,我将介绍如何把 MongoDB 里的数据同步到 Elasticsearch。我将展示如何使用 MongoDB connector 来实现这个功能。

在今天的练习中,我将使用如下的架构:

如上所示,我们把 MongoDB 里的数据集通过 MongoDB 连接器同步到 Elasticsearch 中。在今天的展示中,我将使用 Elastic Stack 8.13.2 来进行展示。

安装

MongoDB

我们可以按照链接来进行安装。我们在 Ubuntu OS 22.04 上进行安装。

在终端中,如果尚未安装,安装 gnupg 和 curl:

sudo apt-get install gnupg curl

要导入 MongoDB 的公共 GPG 密钥,请运行以下命令:

curl -fsSL https://www.mongodb.org/static/pgp/server-7.0.asc | \sudo gpg -o /usr/share/keyrings/mongodb-server-7.0.gpg \--dearmor

创建一个 MongoDB 的列表文件。为你的 Ubuntu 版本创建列表文件 /etc/apt/sources.list.d/mongodb-org-7.0.list

echo "deb [ arch=amd64,arm64 signed-by=/usr/share/keyrings/mongodb-server-7.0.gpg ] https://repo.mongodb.org/apt/ubuntu jammy/mongodb-org/7.0 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-7.0.list

发出以下命令以重新加载本地软件包数据库:

sudo apt-get update

你可以安装 MongoDB 的最新稳定版本,也可以安装特定版本的 MongoDB。

sudo apt-get install -y mongodb-org

我们需要针对 MongoDB 进行配置:

/etc/mongod.conf

我们可以使用如下的命令来启动 mongodb:

sudo service mongod start

我们可以使用如下的命令来查看 mongodb 的状态:

service mongod status

我们可以使用如下的命令来创建一个数据库:

mongosh 192.168.0.9
parallels@ubuntu2204:~$ mongosh 192.168.0.9 
Current Mongosh Log ID:	661fc4d5b790b167823b9273
Connecting to:		mongodb://192.168.0.9:27017/?directConnection=true&appName=mongosh+2.2.4
Using MongoDB:		7.0.8
Using Mongosh:		2.2.4For mongosh info see: https://docs.mongodb.com/mongodb-shell/------The server generated these startup warnings when booting2024-04-17T20:40:52.680+08:00: Using the XFS filesystem is strongly recommended with the WiredTiger storage engine. See http://dochub.mongodb.org/core/prodnotes-filesystem2024-04-17T20:40:53.345+08:00: Access control is not enabled for the database. Read and write access to data and configuration is unrestricted2024-04-17T20:40:53.345+08:00: vm.max_map_count is too low
------test> use mydatabase
switched to db mydatabase
mydatabase> show dbs
admin   40.00 KiB
config  12.00 KiB
local   40.00 KiB
mydatabase> db.mycol.insert({"name":"liuxg"})
DeprecationWarning: Collection.insert() is deprecated. Use insertOne, insertMany, or bulkWrite.
{acknowledged: true,insertedIds: { '0': ObjectId('661fc5a3b790b167823b9274') }
}
mydatabase> show dbs
admin       40.00 KiB
config      12.00 KiB
local       40.00 KiB
mydatabase   8.00 KiB
mydatabase> 
mydatabase> db.posts.insertOne({
...   title: "Post Title 1",
... })
{acknowledged: true,insertedId: ObjectId('6627ac96b790b167823b9275')
}
mydatabase> show dbs
admin       40.00 KiB
config      72.00 KiB
local       40.00 KiB
mydatabase  48.00 KiB
mydatabase> show dbs
admin       40.00 KiB
config      72.00 KiB
local       40.00 KiB
mydatabase  48.00 KiB
mydatabase> show collections
mycol
posts
mydatabase> show tables
mycol
posts
mydatabase> db.posts.find()
[{ _id: ObjectId('6627ac96b790b167823b9275'), title: 'Post Title 1' }
]

在默认的情况下,MongoDB 的安装是没有任何安全的。我们可以参考链接来配置 MongoDB 的安全。我们必须使用如下的步骤来进行:

  1. 创建管理用户。这个需要针对 admin 数据库来操作
  2. 启动安全,并为自己的数据库创建用户

我们首先为 admin 数据库添加一个用户:liuxg/password:

use admin
db.createUser({user: "liuxg",pwd: "password",roles: [ { role: "userAdminAnyDatabase", db: "admin" }, "readWriteAnyDatabase" ]}
)

如果你想在运行时设置密码,你可以使用如下的命令:

use admin
db.createUser({user: "liuxg",pwd: passwordPrompt(),roles: [ { role: "userAdminAnyDatabase", db: "admin" }, "readWriteAnyDatabase" ]}
)
$ mongosh mongodb://192.168.0.9:27017 
Current Mongosh Log ID:	6629cb4422076463783b9273
Connecting to:		mongodb://192.168.0.9:27017/?directConnection=true&appName=mongosh+2.2.4
Using MongoDB:		7.0.8
Using Mongosh:		2.2.4For mongosh info see: https://docs.mongodb.com/mongodb-shell/------The server generated these startup warnings when booting2024-04-25T11:17:14.139+08:00: Using the XFS filesystem is strongly recommended with the WiredTiger storage engine. See http://dochub.mongodb.org/core/prodnotes-filesystem2024-04-25T11:17:14.980+08:00: Access control is not enabled for the database. Read and write access to data and configuration is unrestricted2024-04-25T11:17:14.980+08:00: vm.max_map_count is too low
------test> use admin
switched to db admin
admin> db.createUser(
...   {
...     user: "liuxg",
...     pwd: "password",
...     roles: [ { role: "userAdminAnyDatabase", db: "admin" }, "readWriteAnyDatabase" ]
...   }
... )
{ ok: 1 }

我们接下来退出来。我们需要配置 MongoDB。我们找到配置文件 /etc/mongodb.conf

/etc/mongodb.conf

我们需要启动安全。修改完毕后,我们再次使用如下的命令来启动 MongodDB:

parallels@ubuntu2204:~$ sudo vi /etc/mongod.conf 
parallels@ubuntu2204:~$ sudo service mongod restart
parallels@ubuntu2204:~$ service mongod status
● mongod.service - MongoDB Database ServerLoaded: loaded (/lib/systemd/system/mongod.service; disabled; vendor preset: enabled)Active: active (running) since Thu 2024-04-25 11:21:10 CST; 2s agoDocs: https://docs.mongodb.org/manualMain PID: 58182 (mongod)Memory: 169.9MCPU: 897msCGroup: /system.slice/mongod.service└─58182 /usr/bin/mongod --config /etc/mongod.conf

我们再次使用如下的命令来进行 CLI:

parallels@ubuntu2204:~$ mongosh mongodb://192.168.0.9:27017 
Current Mongosh Log ID:	6629cc5699b3c8d57d3b9273
Connecting to:		mongodb://192.168.0.9:27017/?directConnection=true&appName=mongosh+2.2.4
Using MongoDB:		7.0.8
Using Mongosh:		2.2.4For mongosh info see: https://docs.mongodb.com/mongodb-shell/test> use mydatabase
switched to db mydatabase
mydatabase> db.posts.find({})
MongoServerError[Unauthorized]: Command find requires authentication
mydatabase> db.auth("liuxg", "password")
MongoServerError[AuthenticationFailed]: Authentication failed.
mydatabase> use admin
switched to db admin
admin> db.posts.find({})
MongoServerError[Unauthorized]: Command find requires authentication
admin> use admin
already on db admin
admin> db.auth("liuxg", "password")
{ ok: 1 }
admin> use mydatabase
switched to db mydatabase
mydatabase> db.createUser(
...   {
...     user: "elastic",
...     pwd: "123456",
...     roles: [ { role: "readWrite", db: "mydatabase" } ]
...   }
... )

如果我们没有使用 db.auth 来进行验证,那么我们不能针对 mydatabase 进行任何的操作。我们验证成功后,就可以为 mydatabase 进行创建用户。如上所示,我们创建了一个叫做 elastic/123456 的用户。

我们也可以通过 elastic/123456 来进行验证:

parallels@ubuntu2204:~$ mongosh mongodb://192.168.0.9:27017 
Current Mongosh Log ID:	663a2db6f8be5e933f99ea71
Connecting to:		mongodb://192.168.0.9:27017/?directConnection=true&appName=mongosh+2.2.5
Using MongoDB:		7.0.8
Using Mongosh:		2.2.5For mongosh info see: https://docs.mongodb.com/mongodb-shell/test> use mydatabase
switched to db mydatabase
mydatabase> db.auth("elastic", "password")
MongoServerError[AuthenticationFailed]: Authentication failed.
mydatabase> db.auth("elastic", "123456")
{ ok: 1 }
mydatabase> show tables
mycol
posts

Java 安装

你需要安装 Java。版本在 Java 8 或者 Java 11。我们可以参考链接来查找需要的 Java 版本。

Elasticsearch

我们可参考我之前的文章 “如何在 Linux,MacOS 及 Windows 上进行安装 Elasticsearch” 来安装 Elasticsearch。特别地,我们需要按照 Elastic Stack 8.x 的安装指南来进行安装。

在 Elasticsearch 终端输出中,找到 elastic 用户的密码和 Kibana 的注册令牌。 这些是在 Elasticsearch 第一次启动时打印的。

我们记下这个密码,并在下面的配置中进行使用。同时它也会生成相应的证书文件:

$ pwd
/Users/liuxg/elastic/elasticsearch-8.13.2/config/certs
$ ls
http.p12      http_ca.crt   transport.p12

保存密码、注册令牌和证书路径名。 你将在后面的步骤中需要它们。如果你对这些操作还不是很熟的话,请参考我之前的文章 “Elastic Stack 8.0 安装 - 保护你的 Elastic Stack 现在比以往任何时候都简单”。

安装 Kibana

我们接下来安装 Kibana。我们可以参考我之前的文章 “如何在 Linux,MacOS 及 Windows 上安装 Elastic 栈中的 Kibana” 来进行我们的安装。特别地,我们需要安装 Kibana 8.2 版本。如果你还不清楚如何安装 Kibana 8.2,那么请阅读我之前的文章 “Elastic Stack 8.0 安装 - 保护你的 Elastic Stack 现在比以往任何时候都简单”。在启动 Kibana 之前,我们可以修改 Kibana 的配置文件如下。添加如下的句子到 config/kibana.yml 中去:

config/kibana.yml

enterpriseSearch.host: http://localhost:3002

Enterprise search 安装

我们在地址 Download Elastic Enterprise Search | Elastic 找到我们需要的版本进行下载。并按照页面上相应的指令来进行按照。如果你想针对你以前的版本进行安装的话,请参阅地址 https://www.elastic.co/downloads/past-releases#app-search。

等我们下载完 Enterprise Search 的安装包,我们可以使用如下的命令来进行解压缩:

$ pwd
/Users/liuxg/elastic
$ ls
elastic-agent-8.13.2-darwin-aarch64.tar.gz kibana-8.13.2
elasticsearch-8.13.2                       kibana-8.13.2-darwin-aarch64.tar.gz
elasticsearch-8.13.2-darwin-aarch64.tar.gz logstash-8.13.2-darwin-aarch64.tar.gz
enterprise-search-8.13.2.tar.gz            metricbeat-8.13.2-darwin-aarch64.tar.gz
filebeat-8.13.2-darwin-aarch64.tar.gz
$ tar xzf enterprise-search-8.13.2.tar.gz 
$ cd enterprise-search-8.13.2
$ ls
LICENSE    NOTICE.txt README.md  bin        config     lib        metricbeat

如上所示,它含有一个叫做 config 的目录。我们在启动  Enterprise Search 之前,必须做一些相应的配置。我们需要修改 config/enterprise-search.yml 文件。在这个文件中添加如下的内容:

config/enterprise-search.yml

allow_es_settings_modification: true
# secret_management.encryption_keys: ['q3t6w9z$C&F)J@McQfTjWnZr4u7x!A%D']
elasticsearch.username: elastic
elasticsearch.password: "VDMlz5QnM_0g-349fFq7"
elasticsearch.host: https://127.0.0.1:9200
elasticsearch.ssl.enabled: true
elasticsearch.ssl.certificate_authority: /Users/liuxg/elastic/elasticsearch-8.13.2/config/certs/http_ca.crt
kibana.external_url: http://localhost:5601

在上面,请注意 elasticsearch.password 是我们在 Elasticsearch 安装过程中生成的密码。elasticsearch.ssl.certificate_authority 必须根据自己的 Elasticsearch 安装路径中生成的证书进行配置。在上面的配置中,如果我们还没有配置 secret_management.encryption_keys。我们可以使用上面的配置先运行,然后让系统帮我们生成。在配置上面的密码时,我们需要添加上引号。我发现在密码中含有 * 字符会有错误的信息。我们使用如下的命令来启动:

bin/enterprise-search

allow_es_settings_modification: true
# secret_management.encryption_keys: ['q3t6w9z$C&F)J@McQfTjWnZr4u7x!A%D']
secret_management.encryption_keys: [8641aa589cf5d6a90250e25a875eeda20052f049999e3f31f9265c7444ad703a]
elasticsearch.username: elastic
elasticsearch.password: "VDMlz5QnM_0g-349fFq7"
elasticsearch.host: https://127.0.0.1:9200
elasticsearch.ssl.enabled: true
elasticsearch.ssl.certificate_authority: /Users/liuxg/elastic/elasticsearch-8.13.2/config/certs/http_ca.crt
kibana.external_url: http://localhost:5601

我们再次运行:

bin/enterprise-search

allow_es_settings_modification: true
# secret_management.encryption_keys: ['q3t6w9z$C&F)J@McQfTjWnZr4u7x!A%D']
secret_management.encryption_keys: [8641aa589cf5d6a90250e25a875eeda20052f049999e3f31f9265c7444ad703a]
elasticsearch.username: elastic
elasticsearch.password: "VDMlz5QnM_0g-349fFq7"
elasticsearch.host: https://127.0.0.1:9200
elasticsearch.ssl.enabled: true
elasticsearch.ssl.certificate_authority: /Users/liuxg/elastic/elasticsearch-8.13.2/config/certs/http_ca.crt
kibana.external_url: http://localhost:5601secret_session_key: f5b111b4272b3593c08599a4dfd236676e2b74ece5c805eef19e528d4c5191e950b62c5499384d0690ffdef52d31e03a5cd23c49718531aa3a2f59aa80ff40d2
feature_flag.elasticsearch_search_api: true

为了能够使得我们能够在 App Search 中使用 Elasticsearch 搜索,我们必须设置
feature_flag.elasticsearch_search_api: true。 我们再次重新启动 enterprise search:

再次重新启动:

bin/enterprise-search

这次启动后,我们再也不会看到任何的配置输出了。这样我们的 enterprise search 就配置好了。

至此我们的安装已经完成了。

同步数据到 Elasticsearch

我们登录到 Kibana,然后转到 Search -> Content -> Indices -> Create a new index

我们接下来创建一个 Connector:

接下来,我们需要按照 Kibana 中的指示来进行操作:

我们首先需要申请一个 API key:

我们需要克隆 elastic/connectors 仓库:

git clone https://github.com/elastic/connectors

我们进入到该仓库代码的根目录下,并把 config.yml.example 文件拷贝到 config.yml 文件中。我们也同时拷贝 Elasticsearch 的证书到这个目录下:

$ pwd
/Users/liuxg/python/connectors
$ ls
Dockerfile                 catalog-info.yaml          requirements
Dockerfile.ftest           config.yml.example         resources
LICENSE                    connectors                 scripts
MANIFEST.in                docs                       setup.py
Makefile                   logo-enterprise-search.png tests
NOTICE.txt                 pyrightconfig.json
README.md                  pytest.ini
$ cp config.yml.example config.yml
$ cp ~/elastic/elasticsearch-8.13.2/config/certs/http_ca.crt .

config.yml

connectors:
-connector_id: "7V37Do8Bjfz0jsfahzL3"service_type: "mongodb"
elasticsearch:host: "https://192.168.0.3:9200"api_key: "N2wzX0RvOEJqZnowanNmYWhqS0k6eVZtSDJMVFFRRS1UZjFacWI2eUhPUQ=="ssl: trueusername: "elastic"password: "VDMlz5QnM_0g-349fFq7"  ca_certs: "/usr/share/certs/http_ca.crt"service:idling: 30heartbeat: 300max_errors: 20max_errors_span: 600max_concurrent_content_syncs: 1max_concurrent_access_control_syncs: 1job_cleanup_interval: 300log_level: INFOsources:mongodb: connectors.sources.mongo:MongoDataSource

你需要根据自己的配置进行相应的修改。请注意上面的 connector_id 需要根据上面界面生成的值进行相应的修改。

根据链接的描述,我们有两种方法来进行部署。我们选择使用 docker 来进行部署。我们在刚才创建的 config.yml 目录下进行运行:

docker run \
-v ~/python/connectors:/config \
--volume="$PWD/http_ca.crt:/usr/share/certs/http_ca.crt:ro" \
--rm \
--tty -i \
--network host \
docker.elastic.co/enterprise-search/elastic-connectors:8.13.0.0-SNAPSHOT \
/app/bin/elastic-ingest \
-c /config/config.yml

运行完上面的命令后,我们可以在 Kibana 中看到:

上面显示我们的 connector 已经成功地连接了。

我们也可以使用自己数据库上的账号:

在进行下面的同步之前,我们可以先创建一个索引 test,当然也可以使用之前创建的 mydatabase:

一旦配置完毕,我们可以看到如下的画面:

点击上面的 Sync 按钮:

等 Sync 完毕后,我们可以在 Docker 的 terminal 中看到如下的画面:

我们可以到 Kibana 中进行查看文件:

GET test/_search

我们可以看到 test 现在含有通过过来的文档。

好了,现在我们已经完成了从 MongoDB 到 Elasticsearch 的同步了。我们可以很方便地把 MongoDB 的数据通过 Connector 同步到 Elasticsearch。希望这篇文章对大家的开发有所帮助。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/675468.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LTE的EARFCN和band之间的对应关系

一、通过EARFCN查询对应band 工作中经常遇到只知道EARFCN而需要计算band的情况,因此查了相关协议,找到了他们之间的对应关系,可以直接查表,非常方便。 具体见: 3GPP TS 36.101 5.7.3 Carrier frequency and EAR…

java+jsp+Oracle+Tomcat 记账管理系统论文(完整版)

⬇️⬇️⬇️⬇️⬇️⬇️⬇️⬇️⬇️⬇️⬇️⬇️⬇️⬇️⬇️⬇️⬇️⬇️⬇️⬇️⬇️⬇️⬇️⬇️⬇️⬇️⬇️⬇️⬇️⬇️⬇️ ➡️点击免费下载全套资料:源码、数据库、部署教程、论文、答辩ppt一条龙服务 ➡️有部署问题可私信联系 ⬆️⬆️⬆️​​​​​​​⬆️…

【busybox记录】【shell指令】uniq

目录 内容来源: 【GUN】【uniq】指令介绍 【busybox】【uniq】指令介绍 【linux】【uniq】指令介绍 使用示例: 去除重复行 - 默认输出 去除重复行 - 跳过第n段(空格隔开),比较n1以后的内容,去重 去…

2024-05-07 商业分析-赚钱之前怎么预估风险-记录

摘要: 2024-05-07 商业分析-赚钱之前怎么预估风险-记录 赚钱之前怎么预估风险 好,大家好,今天我们来讲的是叫什么赚钱之前怎么评估风险啊,这个问题啊提的很好啊,我们待会儿来讲啊。呃,首先呢今天这个主题呢来自于昨天…

常用算法汇总

作者:指针不指南吗 专栏:算法篇 🐾算法思维逻辑🐾 文章目录 1.判断闰年2.计算从某天到某天的天数3.二分4. 前缀和5.差分6.图论6.1dfs6.2走迷宫 7.最短路7.1dijkstra7.2foly 8.并查集9.数论9.1gcd lcm9.2判断素数(质数)9.3分解质因…

GNU Radio FFT模块结合stream to vector应用及Rotator频偏模块使用

文章目录 前言一、FFT 模块应用1、stream to vector 介绍2、创建 grc 图测试3、运行结果 二、频偏模块1、Rotator 简介2、创建 grc 图测试3、运行结果 前言 写个博客记录一下自己的蠢劲儿,之前我想用 FFT 模块做一些信号分析的东西,官方的 FFT 模块必须…

论文复现丨多车场带货物权重车辆路径问题:改进邻域搜索算法

引言 本系列文章是路径优化问题学习过程中一个完整的学习路线。问题从简单的单车场容量约束CVRP问题到多车场容量约束MDCVRP问题,再到多车场容量时间窗口复杂约束MDCVRPTW问题,复杂度是逐渐提升的。 如果大家想学习某一个算法,建议从最简单…

快速了解OV证书和DV证书的区别及使用场景

OV(Organization Validation,组织验证)证书和DV(Domain Validation,域名验证)证书都是SSL/TLS证书,用于保护网站数据传输的安全性和提供身份验证,但两者在验证深度、信任级别、提供的…

分布式锁-快速入门

文章目录 前言一、基础概念1.1 什么是锁1.2 什么是分布式锁1.3 锁和事务的区别二、分布式锁基础理论2.1 为什么要使用分布式锁2.2 分布式锁特性2.3 分布式锁的实现方式总结前言 由于在平时的工作中,线上服务器是分布式多台部署的,经常会面临解决分布式场景下数据一致性的问题…

【软测学习笔记】Python入门Day02

🌟博主主页:我是一只海绵派大星 📚专栏分类:软件测试笔记 📚参考教程:黑马教程❤️感谢大家点赞👍收藏⭐评论✍️ python安装 1、进入Python的官方下载页面: Download Python | Py…

c#教程——索引器

前言: 索引器(Indexer)可以像操作数组一样来访问对象的元素。它允许你使用索引来访问对象中的元素,就像使用数组索引一样。在C#中,索引器的定义方式类似于属性,但具有类似数组的访问方式。 索引器&#x…

125.两两交换链表中的节点(力扣)

题目描述 代码解决及思路 /*** Definition for singly-linked list.* struct ListNode {* int val;* ListNode *next;* ListNode() : val(0), next(nullptr) {}* ListNode(int x) : val(x), next(nullptr) {}* ListNode(int x, ListNode *next) : val(x), …