Logstash应用-同步ES(elasticsearch)到HDFS

img

1.场景分析

现有需求需要将elasticsearch的备份至hdfs存储,根据以上需求,使用logstash按照天级别进行数据的同步

2.重难点

  • 数据采集存在时间漂移问题,数据保存时使用的是采集时间而不是数据生成时间
  • 采用webhdfs无法对文件大小进行设置
  • 解决@timestamp时区问题

3.问题解决

3.1 安装webhdfs插件

./bin/logstash-plugin install logstash-output-webhdfs

3.2 logstash配置

input{elasticsearch{hosts => "xxxx:9200"index => "xxxx"#自定义查询query => '{"query": {"range": {"create_time":{"gte": 1704668760000,"lte": 1704668820000}}}}'size => 10000scroll => "5m"slices => 1user => "xxx"password => "xxxx"}
}
filter {date {  #增加@timestamp,并将记录产生时间赋值给@timestamp,时间处理默认是按照@timestamp的时间match => ["create_time","UNIX_MS"]timezone => "Asia/Shanghai"target => "@timestamp"}#增加一个timestamp,对@timestamp时间增加8小时ruby {   code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"   } #将timestamp赋值给@timestampruby {  code => "event.set('@timestamp',event.get('timestamp'))"  }#设置导入到hdfs的文件数量,需要增加一个字段,当然也可以用时间来控制文件数量,但是只有固定的几个数字,此处按照3个文件控制ruby {   code => "event.set('sync_bucket', event.get('created')%3)"   }#删除上处增加的临时字段timestamp  mutate {  remove_field => ["timestamp"]  }  
}
output {webhdfs {#高可哟集群需要配置standbystandby_host => "xxx"standby_port => 9870host => "xxxx"port => 9870path => "/hadoop/test/part_day=%{+YYYYMMdd}/logstash-%{sync_bucket}.log"#按照时间控制文件生成数量,+a是上下午的意思#path => "/hadoop/dm_dw/on/ods/ods_cc_es_initLogPro_di/part_day=%{+YYYYMMdd}/logstash-%{+a}.log"user => "hadoop"compression => "gzip"idle_flush_time => 60codec => "jsonlines"}}

logstash时间处理官网:https://www.elastic.co/guide/en/logstash/current/plugins-filters-date.html#plugins-filters-date-match
比较不错的logstash介绍网站:https://doc.yonyoucloud.com/doc/logstash-best-practice-cn/get_start/full_config.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/340554.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++ future/promise/thread/async/packaged_task入门

std::promise、future、thread、async、packaged_task这些到底是个啥? 两种获取异步结果的方式 std::future std::future是一个同步原语,它代表了一个异步操作的结果。这个结果可能来自另一个线程、任务或者异步操作,而std::future提供了一种…

【LeetCode:200. 岛屿数量 | DFS 】

🚀 算法题 🚀 🌲 算法刷题专栏 | 面试必备算法 | 面试高频算法 🍀 🌲 越难的东西,越要努力坚持,因为它具有很高的价值,算法就是这样✨ 🌲 作者简介:硕风和炜,…

使用微信读书高效阅读论文,自带翻译功能。

下面以“向文本到图像扩散模型添加条件控制”(Adding Conditional Control to Text-to-Image Diffusion Models)这篇论文示例下阅读效果。 论文地址:https://arxiv.org/abs/2302.05543 选择右侧的download PDF, 然后进入论文预览页面&#x…

记录误删除docker中极狐gitlab容器恢复过程

如题一次误操作导致删除了docker中极狐gitlab容器恢复过程 情况说明 创建容器时,我是用的是极狐官网推荐安装的步骤,具体按照官网步骤走就行 sudo docker run --detach \--hostname gitlab.example.com \--publish 443:443 --publish 80:80 --publish …

有效的回文

常用方法就是双指针。使用两个指针从字符串的两端向中间移动,同时比较对应位置的字符,直到两个指针相遇。由于题目忽略非字母和非数字的字符且忽略大小写,所以跳过那些字符,并将字母转换为小写(或大写)进行…

SpringBoot默认配置文件

✅作者简介:大家好,我是Leo,热爱Java后端开发者,一个想要与大家共同进步的男人😉😉 🍎个人主页:Leo的博客 💞当前专栏: 循序渐进学SpringBoot ✨特色专栏: MySQL学习 🥭本文内容:SpringBoot默认配置文件 📚个人知识库: Leo知识库,欢迎大家访问 1.前言☕…

公网环境使用移动端设备+cpolar远程访问本地群晖nas上的影视资源

文章目录 1.使用环境要求:2.下载群晖videostation:3.公网访问本地群晖videostation中的电影:4.公网条件下使用电脑浏览器访问本地群晖video station5.公网条件下使用移动端(搭载安卓,ios,ipados等系统的设备…

杨中科 .NETCORE 异步编程

一、 为什么需要异步编程 异步点餐的优点:能同时服务多个客人 异步点餐一定会提升单个客户点餐速度吗? 答案理所当然:不能 图片美化服务例子服务器能够同时服务的请求数量有限 void BeautifyPic (File photo, Response response) {byte[] …

企业出海合规:如何区分数据控制者与数据处理者

什么是数据控制者? 数据控制者确定个人数据的处理目的和方式。若该企业有权决定处理个人数据的“原因”和“方式”,那么它就是数据控制者。在GDPR中,数据控制者在保护数据主体(例如网站用户)的隐私和权利方面负有最大…

【排序】归并排序(C语言实现)

文章目录 1. 递归版的归并排序1.1 归并排序的思想2. 递归版的归并排序的实现 2. 非递归版的归并排序 1. 递归版的归并排序 1.1 归并排序的思想 归并排序(MERGE - SORT)是建立在归并操作上的一种有效的排序算法, 该算法是采用分治法(Divide a…

python进行简单的app自动化测试(pywinauto)+ 截屏微信二维码

一、开始需要了解准备 1、安装 pip install pywinauto2、选择(后面会通过工具进行判断用哪个) 3、自动化控制进程的范围 示例 Application单进程 Desktop多进程 4、程序辅助检测工具 3中的下载连接 链接 点击放大镜拖到对应位置即可 二、简单的开始…

函数指针和回调函数

文章目录 一.函数指针1.什么是函数指针2.函数指针的形式3.函数指针的用途。1.调用函数2.作为参数进行传递 二.函数指针数组三.回调函数 一.函数指针 1.什么是函数指针 函数指针是指向函数的指针。在C语言和C中,函数指针可以用来存储函数的地址,并且可以…