使用 Logstash 丰富你的 Elasticsearch 文档

作者:来自 Elastic David Pilato

我们在上一篇文章中看到,我们可以使用摄取管道中的 Elasticsearch Enrich Processor 在 Elasticsearch® 中进行数据丰富。 但有时,你需要执行更复杂的任务,或者你的数据源不是 Elasticsearch,而是另一个源。 或者,你可能希望存储在 Elasticsearch 和第三方系统中,在这种情况下,将管道的执行转移到 Logstash® 很有意义。

使用 Elasticsearch 丰富 Elasticsearch 数据

使用 Logstash,使用类似于以下的管道,这非常容易:

input {# Read all documents from Elasticsearchelasticsearch {hosts => ["${ELASTICSEARCH_URL}"]user => "elastic"password => "${ELASTIC_PASSWORD}"index => "kibana_sample_data_logs"docinfo => trueecs_compatibility => "disabled"}
}filter {# Enrich every document with Elasticsearchelasticsearch {hosts => ["${ELASTICSEARCH_URL}"]user => "elastic"password => "${ELASTIC_PASSWORD}"index => "vip"query => "ip:%{[clientip]}"sort => "ip:desc"fields => {"[name]" => "[name]""[vip]" => "[vip]"}}mutate { remove_field => ["@version", "@timestamp"] }
}output {if [name] {# Write all modified documents to Elasticsearchelasticsearch {manage_template => falsehosts => ["${ELASTICSEARCH_URL}"]user => "elastic"password => "${ELASTIC_PASSWORD}"index => "%{[@metadata][_index]}"document_id => "%{[@metadata][_id]}"}}
}

总共,我们有 14074 个事件需要解析。 虽然不是很多,但对于这个演示来说已经足够了。 这是一个示例事件:

{"agent": "Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24","bytes": 1831,"clientip": "30.156.16.164","extension": "","geo": {"srcdest": "US:IN","src": "US","dest": "IN","coordinates": {"lat": 55.53741389,"lon": -132.3975144}},"host": "elastic-elastic-elastic.org","index": "kibana_sample_data_logs","ip": "30.156.16.163","machine": {"ram": 9663676416,"os": "win xp"},"memory": 73240,"message": "30.156.16.163 - - [2018-09-01T12:43:49.756Z] \"GET /wp-login.php HTTP/1.1\" 404 1831 \"-\" \"Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24\"","phpmemory": 73240,"referer": "http://www.elastic-elastic-elastic.com/success/timothy-l-kopra","request": "/wp-login.php","response": 404,"tags": ["success","info"],"timestamp": "2023-03-18T12:43:49.756Z","url": "https://elastic-elastic-elastic.org/wp-login.php","utc_time": "2023-03-18T12:43:49.756Z","event": {"dataset": "sample_web_logs"}
}

正如我们在上一篇文章中看到的,vip 索引包含有关我们客户的信息:

{ "ip" : "30.156.16.164", "vip": true, "name": "David P" 
}

我们可以通过以下方式运行管道:

docker run \--name=logstash \--rm -it \-v $(pwd)/logstash-config/pipeline/:/usr/share/logstash/pipeline/ \-e XPACK_MONITORING_ENABLED=false \-e ELASTICSEARCH_URL="$ELASTICSEARCH_URL" \-e ELASTIC_PASSWORD="$ELASTIC_PASSWORD" \docker.elastic.co/logstash/logstash:8.12.0

丰富的文档现在看起来像这样:

{"agent": "Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24","bytes": 1831,"clientip": "30.156.16.164","extension": "","geo": {"srcdest": "US:IN","src": "US","dest": "IN","coordinates": {"lat": 55.53741389,"lon": -132.3975144}},"host": "elastic-elastic-elastic.org","index": "kibana_sample_data_logs","ip": "30.156.16.163","machine": {"ram": 9663676416,"os": "win xp"},"memory": 73240,"message": "30.156.16.163 - - [2018-09-01T12:43:49.756Z] \"GET /wp-login.php HTTP/1.1\" 404 1831 \"-\" \"Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24\"","phpmemory": 73240,"referer": "http://www.elastic-elastic-elastic.com/success/timothy-l-kopra","request": "/wp-login.php","response": 404,"tags": ["success","info"],"timestamp": "2023-03-18T12:43:49.756Z","url": "https://elastic-elastic-elastic.org/wp-login.php","utc_time": "2023-03-18T12:43:49.756Z","event": {"dataset": "sample_web_logs"},"vip": true,"name": "David P"
}

实际上很简单,但有一个问题:速度很慢。 通过网络进行查找,尽管 Elasticsearch 速度极快,但仍然会减慢整个管道的速度。

使用静态 JDBC 过滤器

我最近在 ParisJUG 遇到了 Laurent,他来自令人惊叹的 Elastic Consulting 团队,我们讨论了这个问题。 他告诉我,他的一位客户必须面对这个问题。 他建议改用 Logstash 中的 Elasticsearch 缓存。

问题是:Logstash 中没有这样的过滤器缓存插件。 他找到了一种非常聪明的方法来解决该问题,即利用静态 JDBC 过滤器插件和 Elasticsearch JDBC 驱动程序。

请注意,这需要拥有白金许可证(或试用版)。

添加 Elasticsearch JDBC 驱动程序

我们首先需要将 JDBC 驱动程序添加到 Logstash 实例中。

mdir -p logstash-config/lib
wget https://artifacts.elastic.co/maven/org/elasticsearch/plugin/x-pack-sql-jdbc/8.12.0/x-pack-sql-jdbc-8.12.0.jar
mv x-pack-sql-jdbc-8.12.0.jar logstash-config/lib

我们只需要与 Logstash docker 实例共享此目录:

time docker run \--name=logstash \--rm -it \-v $(pwd)/logstash-config/pipeline/:/usr/share/logstash/pipeline/ \-v $(pwd)/logstash-config/lib/:/tmp/lib/ \-e XPACK_MONITORING_ENABLED=false \-e ELASTICSEARCH_URL="$ELASTICSEARCH_URL" \-e ELASTIC_PASSWORD="$ELASTIC_PASSWORD" \docker.elastic.co/logstash/logstash:8.12.0

更新管道

input 部分不变。 但现在,我们要在内存中创建一个名为 vip 的临时表(为了保持一致性)。 该表结构是使用 local_db_objects 参数定义的:

jdbc_static {local_db_objects => [ {name => "vip"index_columns => ["ip"]columns => [["name", "VARCHAR(255)"],["vip", "BOOLEAN"],["ip", "VARCHAR(64)"]]} ]
}

当 jdbc_static 启动时,我们要首先从 Elasticsearch vip索引中读取所有数据集。 这是在 loaders 选项中完成的:

jdbc_static {loaders => [ {query => "select name, vip, ip from vip"local_table => "vip"} ]jdbc_user => "elastic"jdbc_password => "${ELASTIC_PASSWORD}"jdbc_driver_class => "org.elasticsearch.xpack.sql.jdbc.EsDriver"jdbc_driver_library => "/tmp/lib/x-pack-sql-jdbc-8.12.0.jar"jdbc_connection_string => "jdbc:es://${ELASTICSEARCH_URL}"
}

每次我们需要进行查找时,我们都希望使用以下语句来执行它:

SELECT name, vip FROM vip WHERE ip = "THE_IP"

这可以使用 local_lookups 参数定义:

jdbc_static {local_lookups => [ {query => "SELECT name, vip FROM vip WHERE ip = :ip"parameters => { "ip" => "clientip" }target => "vip"} ]
}

如果没有找到数据,我们可以使用 default_hash 选项提供默认值:

jdbc_static {local_lookups => [ {query => "SELECT name, vip FROM vip WHERE ip = :ip"parameters => { "ip" => "clientip" }target => "vip" default_hash => {name => nilvip => false}} ]
}

最后,这将在事件中生成 vip.name 和 vip.vip 字段。

我们现在可以定义我们想要对这些临时字段执行的操作:

jdbc_static {add_field => { name => "%{[vip][0][name]}" }add_field => { vip => "%{[vip][0][vip]}" }remove_field => ["vip"]
}

这给出了以下过滤器:

filter {# Enrich every document with Elasticsearch via static JDBCjdbc_static {loaders => [ {query => "select name, vip, ip from vip"local_table => "vip"} ]local_db_objects => [ {name => "vip"index_columns => ["ip"]columns => [["name", "VARCHAR(255)"],["vip", "BOOLEAN"],["ip", "VARCHAR(64)"]]} ]local_lookups => [ {query => "SELECT name, vip FROM vip WHERE ip = :ip"parameters => { "ip" => "clientip" }target => "vip" default_hash => {name => nilvip => false}} ]add_field => { name => "%{[vip][0][name]}" }add_field => { vip => "%{[vip][0][vip]}" }remove_field => ["vip"]jdbc_user => "elastic"jdbc_password => "${ELASTIC_PASSWORD}"jdbc_driver_class => "org.elasticsearch.xpack.sql.jdbc.EsDriver"jdbc_driver_library => "/tmp/lib/x-pack-sql-jdbc-8.12.0.jar"jdbc_connection_string => "jdbc:es://${ELASTICSEARCH_URL}"}mutate { remove_field => ["@version", "@timestamp"] }
}

将修改后的文档写入Elasticsearch

在第一个管道中,我们测试事件中是否确实存在名称字段:

if [name] {# Index to Elasticsearch
}

我们仍然可以使用类似的东西,但因为我们提供了默认值,以防在 Elasticsearch vip 索引中找不到 ip,所以现在它会在标签表中生成一个新的 _jdbcstaticdefaultsused 标签。

我们可以用它来知道我们是否发现了某些东西,如果是前者,则将我们的数据发送到 Elasticsearch:

output {if "_jdbcstaticdefaultsused" not in [tags] {# Write all the modified documents to Elasticsearchelasticsearch {manage_template => falsehosts => ["${ELASTICSEARCH_URL}"]user => "elastic"password => "${ELASTIC_PASSWORD}"index => "%{[@metadata][_index]}"document_id => "%{[@metadata][_id]}"}}
}

更快吗?

因此,当我们在这个小数据集上运行测试时,我们可以看到,使用 Elasticsearch 过滤器方法,需要两分钟多一点的时间来丰富我们的数据集:

real    2m3.146s
user    0m0.077s
sys     0m0.042s

当使用 JDBC 静态过滤器方法运行管道时,现在只需不到一分钟:

real    0m48.575s
user    0m0.064s
sys     0m0.039s

正如我们所看到的,我们显着减少了该丰富管道的执行时间(增益约为 60%)。

如果你有一个可以轻松放入 Logstash JVM 内存的小型 Elasticsearch 索引,你可以尝试此策略(或类似的策略)。 如果你有数亿个文档,你仍然应该使用 Elasticsearch Filter Plugin。

结论

在这篇文章中,我们了解了当我们需要在 Elasticsearch 中执行一些查找时,如何使用 JDBC 静态过滤器插件来加速数据丰富管道。 在下一篇文章中,我们将了解如何使用 Elastic Agent 在边缘进行类似的丰富。

本文中描述的任何特性或功能的发布和时间安排均由 Elastic 自行决定。 当前不可用的任何特性或功能可能无法按时交付或根本无法交付

更多阅读:

  • Logstash:Jdbc static filter plugin 介绍

  • Logstash:运用 jdbc_streaming 来丰富我们的数据

原文:Enrich your Elasticsearch documents with Logstash | Elastic Blog

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/520198.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[设计模式Java实现附plantuml源码~行为型]定义算法的框架——模板方法模式

前言: 为什么之前写过Golang 版的设计模式,还在重新写Java 版? 答:因为对于我而言,当然也希望对正在学习的大伙有帮助。Java作为一门纯面向对象的语言,更适合用于学习设计模式。 为什么类图要附上uml 因为很…

C++数据结构与算法——二叉树的属性

C第二阶段——数据结构和算法,之前学过一点点数据结构,当时是基于Python来学习的,现在基于C查漏补缺,尤其是树的部分。这一部分计划一个月,主要利用代码随想录来学习,刷题使用力扣网站,不定时更…

【Kaggle】练习赛《肥胖风险的多类别预测》

前言 作为机器学习的初学者,Kaggle提供了一个很好的练习和学习平台,其中有一个栏目《PLAYGROUND》,可以理解为游乐场系列赛,提供有趣、平易近人的数据集,以练习他们的机器学习技能,并每个月都会有一场比赛…

cefsharp(winForm)调用js脚本,js脚本调用c#方法

本博文针对js-csharp交互(相互调用的应用) (一)、js调用c#方法 1.1 类名称:cs_js_obj public class cs_js_obj{//注意,js调用C#,不一定在主线程上调用的,需要用SynchronizationContext来切换到主线程//private System.Threading.SynchronizationContext context;//…

Node 旧淘宝源 HTTPS 过期处理

今天拉取老项目更新依赖,出现 urlshttps%3A%2F%2Fregistry.npm.taobao.org%2Fegg-logger%2Fdownload%2Fegg-logger-2.6.1.tgz: certificate has expired 类似报错。即使删除 node_modules 重新安装,问题依然无法解决。 一、问题演示 二、原因分析 1、淘…

Git误操作补救错失:恢复误删的本地分支、将某个提交从一个分支复制到另一个分支

一、恢复误删的本地分支 作为一枚强迫症,没用的分支总是喜欢及时删删删删掉删掉统统删掉,结果今天发现有些分支还是应该保留。 比如,①前段时间切了个分支用来专门做图表,但因为需求还没有最终确定,已经上线了测试服而…

【黑马程序员】C++项目之机房预约管理系统实战

文章目录 需求系统简介身份介绍机房介绍申请简介系统具体需求 实现菜单与退出功能实现功能测试 创建身份类创建角色基类创建学生类创建教师类创建管理员类 登录模块功能描述登录函数封装各个校色具体登录验证管理员操作界面调用流程 管理员模块构造函数实现管理员子菜单显示添加…

YOLO算法改进Backbone系列之:EdgeViT

摘要:在计算机视觉领域,基于Self-attention的模型(如ViTs)已经成为CNN之外的一种极具竞争力的架构。尽管越来越强的变种具有越来越高的识别精度,但由于Self-attention的二次复杂度,现有的ViT在计算和模型大小方面都有较高的要求。…

人力资源档案和人力资源软件的区别

人力资源档案和人力资源软件是两个不同的概念,可以从以下几个方面进行区分: 1. 定义:人力资源档案是指记录和管理员工个人信息、履历、合同、培训记录等各种人力资源相关文件的集合,主要以纸质或电子形式保存。而人力资源软件是指…

CIA402协议笔记

文章目录 1、对象字典1.1 Mode of Operation( 606 0 h 6060_h 6060h​)1.2 Modes of opration display( 606 1 h ) 6061_h) 6061h​) 2、状态机2.1 控制字(ControlWord、6040h)2.2 状态字(StatusWord、6041h)2.3 shutd…

PyCharm连接远程服务器(保姆级教程)

第一步:配置解释器 File→Settings→Project:xxx→Python Interpreter 增加新的解释器 选择SSH Interpreter 输入远程服务器ip地址、你在服务器上的用户名 输入密码 选择在服务器上配置好的环境 更改映射目录 第二步:部署 Tools→…

网络工程师笔记8

华为VRP系统 设备管理方式 web管理方式 命令行管理方式 修改命令:undo 基础配置命令