logstash 与ElasticSearch:从CSV文件到搜索宝库的导入指南

logstash 与ElasticSearch:从CSV文件到搜索宝库的导入指南

使用 logstash 导入数据到 ES 时,由三个步骤组成:input、filter、output。整个导入过程可视为:unix 管道操作,而管道中的每一步操作都是由 “插件” 实现的。使用 ./bin/logstash-plugin list 查看 logstash 已安装的插件。

每个插件的选项都可以在官网查询,先明确是哪一步操作,然后去官方文档看是否有相应的插件是否支持这种操作。比如 output 配置选项:plugins-outputs-elasticsearch-options),其中的 doc_id 选项就支持 指定 docid 写入 ES。在这里,简要说明一些常用的插件,要想了解它们实现的功能可参考官方文档。

  1. mutate 插件 用于字段文本内容处理,比如 字符替换
  2. csv 插件 用于 csv 格式文件导入 ES
  3. convert 插件 用于字段类型转换
  4. date 插件 用于日期类型的字段处理

使用 logstash 导入时,默认以 “message” 标识 每一行数据,并且会生成一些额外的字段,比如 @version、host、@timestamp,如果用不着,这些字段可以去除掉 ,此外,要注意 ES 中的索引的格式 (Mapping 结构),最好是指定自定义的索引模板,保证索引最 “精简”。

另外这里记录一些常用的参数及其作用,更具体的解释可查看官方文档。

  1. sincedb_path 告诉 logstash 记录文件已经处理到哪一行了,从而当 logstash 发生故障重启时,可从故障点处开始导入,避免从头重新导入。
  2. remove_field 删除某些字段

配置文件完成后,执行以下命令./bin/logstash -f csvfile_logstash.conf 即可启动 logstash 执行导入操作。

以下是各种错误解决:
错误一:

ConfigurationError”, :message=>”Expected one of #, input, filter, output at line 1, column 1

如果 配置文件内容是正确的,用 Notepad++ 检查一下文件的编码,确保是:UTF-8 无 BOM 格式编码

1.解决 SOH 分隔符问题

由于 csv 插件的 separator 选项不支持转义字符,因此无法用\u0001来代表 SOH。如果 csv 文件以 SOH 分隔符 (\u0001) 分割,一种方案是使用 mutate 插件替换,将\u0001替换成逗号。如下所示:

    mutate{# 每一行内容默认是message, 将分隔符 \u0001 替换成 逗号gsub => [ "message","\u0001","," ]# @timestamp 字段是默认生成的, 名称修改成 createdrename => ["@timestamp", "created"]}

但是实际上 logstash6.8.3 是支持按 SOH 分割的。在 Linux shell 下,先按 ctrl+v,再按 ctrl+a,输入的就是 SOH。那么在 vim 中打开配置文件,在 vim 的 insert 模式下,先按 ctrl+v,再按 ctrl+a,将 SOH 作为 csv 插件的 separator 分割符。

    csv {# 每行按逗号分割, 生成2个字段: topsid 和 title, (如果分割超过2列了,第三列则以 column3 命名)separator => ""columns => ["topsid", "title"]# 删除一些不需要索引到ES中去的字段(logstash默认生成的一些字段)remove_field => ["host", "@timestamp", "@version", "message","path"]}	

一个将 csv 文件内容导入 ES 的示例配置模板如下:(csv 文件中的每一行以 SOH 作为分割符)

  • logstash input 插件支持多种数据来源,比如 kafka、beats、http、file 等。在这里我们的数据来源是文件,因此采用了 logstash input file 插件。
  • 把数据从文件中读到 logstash 后,可能需要对文件内容 / 格式 进行处理,比如分割、类型转换、日期处理等,这由 logstash filter 插件实现。在这里我们进行了文件的切割和类型转换,因此使用的是 logstash filter csv 插件和 mutate 插件。
  • 处理成我们想要的字段后,接下来就是导入到 ES,那么就需要配置 ES 的地址、索引名称、Mapping 结构信息 (使用指定模板写入),这由 logstash output 插件实现,在这里我们把处理后的数据导入 ES,因此使用的是 logstash output elasticsearch 插件。
input {file {path => "/data/psj/test/*.csv"start_position => "beginning"sincedb_path => "/dev/null"}
}filter {csv {# 每行按逗号分割, 生成2个字段: topsid 和 title, (如果分割超过2列了,第三列则以 column3 命名)separator => ""columns => ["topsid", "title"]# 删除一些不需要索引到ES中去的字段(logstash默认生成的一些字段)remove_field => ["host", "@timestamp", "@version", "message","path"]}			mutate {convert => {# 类型转换"topsid" => "integer""title" => "string"}}
}output {elasticsearch {hosts => "http://http://127.0.0.1:9200"index => "chantitletest"# 指定 文档的 类型为 "_doc"document_type => "_doc"# 指定doc id 为topsid字段的值document_id => "%{topsid}"manage_template => true# 使用自定义的模板写入,否则将会以logstash默认模板写入template => "/data/services/logstash-6.8.3/config/chantitletpe.json"template_overwrite => truetemplate_name => "chantitletpe"}stdout{codec => json_lines}
}

(也可以采用 logstash filter 插件的 mutate 选项 将 SOH 转换成逗号):

filter {mutate{# 每一行内容默认是message, 将分隔符 \u0001 替换成 逗号gsub => [ "message","\u0001","," ]# @timestamp 字段是默认生成的, 名称修改成 createdrename => ["@timestamp", "created"]}csv {# 每行按逗号分割, 生成2个字段: topsid 和 title, (如果分割超过2列了,第三列则以 column3 命名)separator => ","columns => ["topsid", "title"]# 删除一些不需要索引到ES中去的字段(logstash默认生成的一些字段)remove_field => ["host", "@timestamp", "@version", "message","path"]}			mutate {convert => {# 类型转换"topsid" => "integer""title" => "string"}}
}

使用的自定义模板如下:

{"index_patterns": ["chantitle_v1","chantitletest"],"settings": {"number_of_shards": 3,"analysis": {"analyzer": {"my_hanlp_analyzer": {"tokenizer": "my_hanlp"},"pinyin_analyzer": {"tokenizer": "my_pinyin"}},"tokenizer": {"my_hanlp": {"enable_normalization": "true","type": "hanlp_standard"},"my_pinyin": {"keep_joined_full_pinyin": "true","lowercase": "true","keep_original": "true","remove_duplicated_term": "true","keep_first_letter": "false","keep_separate_first_letter": "false","type": "pinyin","limit_first_letter_length": "16","keep_full_pinyin": "true"}}}},"mappings": {"_doc": {"properties": {"created": {"type": "date","doc_values": false,"format": "yyyy-MM-dd HH:mm:ss"},"title": {"type": "text","fields": {"pinyin": {"type": "text","boost": 10,"analyzer": "pinyin_analyzer"},"raw": {"type": "keyword","doc_values": false}},"analyzer": "my_hanlp_analyzer"},"topsid": {"type": "long","doc_values": false}}}}
}

上面给了一个 csv 文件导入 ES,这里再给个 txt 文件导入 ES 吧。txt 以逗号分割,每列的内容都在冒号里面,只需要前 4 列内容,一行示例数据如下:

“12345”,“12345”,“研讨区”,“12345”,“500”,“xxxx”,“2008-08-04 22:20:24”,“0”,“300”,“0”,“5”,“0”,“”,“0”,“0”,“”,“”,“0”,“0”

这里采用的是 logstash filter 的 dissect 插件。相比于 grok 插件,它的优点不是采用正规匹配的方式解析数据,速度较快,但不能解析复杂数据。只能够对较为规律的数据进行导入。logstash 配置文件如下:

input {file {path => "/data/psj/test/*.txt"start_position => "beginning"# sincedb_path => "/dev/null"}
}filter {dissect {mapping => {# 插件输入的每一行数据默认名称是message,由于每列数据在双引号里面,因此解析前4列数据的写法如下:"message" => '"%{topsid}","%{subsid}","%{subtitle}","%{pid}"'}# 删除自动生成的、用不着的一些字段remove_field => ["host", "@timestamp", "@version", "message","path"]convert_datatype => {# 类型转换"topsid" => "int""subsid" => "int""pid" => "int"}}
}output {elasticsearch {hosts => "http://127.0.0.1:9200"index => "chansubtitletest"document_type => "_doc"# 指定doc id 为topsid字段的值document_id => "%{subsid}"manage_template => true# 使用自定义的模板写入,否则将会以logstash默认模板写入template => "/data/services/logstash-6.8.3/config/chansubtitle.json"template_overwrite => truetemplate_name => "chansubtitle"}stdout{codec => json_lines}
}ubsid}"manage_template => true# 使用自定义的模板写入,否则将会以logstash默认模板写入template => "/data/services/logstash-6.8.3/config/chansubtitle.json"template_overwrite => truetemplate_name => "chansubtitle"}stdout{codec => json_lines}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/150898.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【计算机毕设小程序案例】基于SpringBoot的小演员招募小程序

前言:我是IT源码社,从事计算机开发行业数年,专注Java领域,专业提供程序设计开发、源码分享、技术指导讲解、定制和毕业设计服务 👉IT源码社-SpringBoot优质案例推荐👈 👉IT源码社-小程序优质案例…

yarn install 这个命令安装如何加速

yarn install 命令用来安装项目依赖,其速度受多种因素影响,如网络速度、npm/yarn包的源服务器、以及本地缓存等。以下是一些可能帮助你加速 yarn install 的方法: 1. 使用国内镜像 如果你在中国,可以使用淘宝的 npm 镜像&#x…

buuctf_练[CISCN2019 华东南赛区]Web4

[CISCN2019 华东南赛区]Web4 文章目录 [CISCN2019 华东南赛区]Web4掌握知识解题思路代码分析正式解题 关键paylaod 掌握知识 ​ 根据url地址传参结构来判断php后端还是python后端;uuid.getnode()函数的了解,可以返回主机MAC地址十六进制;pyt…

【STM32】GPIO控制LED(HAL库版)

STM32最新固件库v3.5/Libraries/CMSIS/CM3/DeviceSupport/ST/STM32F10x/system_stm32f10x.c 林何/STM32F103C8 - 码云 - 开源中国 (gitee.com) STM32最新固件库v3.5/Libraries/STM32F10x_StdPeriph_Driver/src/stm32f10x_gpio.c 林何/STM32F103C8 - 码云 - 开源中国 (gitee.…

行业追踪,2023-10-26

自动复盘 2023-10-26 凡所有相,皆是虚妄。若见诸相非相,即见如来。 k 线图是最好的老师,每天持续发布板块的rps排名,追踪板块,板块来开仓,板块去清仓,丢弃自以为是的想法,板块去留让…

c++的4中类型转换操作符(static_cast,reinterpret_cast,dynamic_cast,const_cast),RTTI

目录 引入 介绍 static_cast 介绍 使用 reinterpret_cast 介绍 使用 const_cast 介绍 使用 dynamic_cast 介绍 使用 RTTI(运行时确定类型) 介绍 typeid运算符 dynamic_cast运算符 type_info类 引入 原本在c中,我们就已经接触到了很多类型转换 -- 隐式类型转…

虹科 | 解决方案 | 汽车示波器 索赔管理方案

索赔管理 Pico汽车示波器应用于主机厂/供应商与服务店/4S店的协作,实现产品索赔工作的高效管理;同时收集的故障波形数据,便于日后的产品优化和改进 故障记录 在索赔申请过程中,Pico汽车示波器的数据记录功能可以用于捕捉故障时的…

element-ui vue2 iframe 嵌入外链新解

效果如图 实现原理 在路由中通过 props 传值 {path: /iframe,component: Layout,meta: { title: 小助手, icon: example },children: [{path: chatglm,name: chatglm,props: { name: chatglm,url: https://chatglm.cn },component: () > import(/views/iframe/common),me…

shell脚本变量

目录 1.变量的定义 2.shell脚本中变量的定义方法 3.变量的转译 4.Linux中命令的别名设定 5.用户环境变量的更改 6.利用命令的执行结果设定变量 7.脚本函数 1.变量的定义 1)定义本身 变量就是内存一片区域的地址 2)变量存在的意义 命令无法操作一直变化的目…

Sql Server中的表组织和索引组织(聚集索引结构,非聚集索引结构,堆结构)

正文 SqlServer用三种方法来组织其分区中的数据或索引页: 1、聚集索引结构 聚集索引是按B树结构进行组织的,B树中的每一页称为一个索引节点。每个索引行包含一个键值和一个指针。指针指向B树上的某一中间级页(比如根节点指向中间级节点中的…

六零导航页SQL注入漏洞复现(CVE-2023-45951)

0x01 产品简介 LyLme Spage(六零导航页)是中国六零(LyLme)开源的一个导航页面。致力于简洁高效无广告的上网导航和搜索入口,支持后台添加链接、自定义搜索引擎,沉淀最具价值链接,全站无商业推广…

Druid 任意文件读取 (CVE-2021-36749)

Druid 任意文件读取 (CVE-2021-36749) 漏洞描述 由于用户指定 HTTP InputSource 没有做出限制,可以通过将文件 URL 传递给 HTTP InputSource 来绕过应用程序级别的限制。攻击者可利用该漏洞在未授权情况下,构造恶意请求执行文件…