【已解决】Java 中使用 ES 高级客户端库 RestHighLevelClient 清理百万级规模历史数据

🎉工作中遇到这样一个需求场景:由于ES数据库中历史数据过多,占用太多的磁盘空间,需要定期地进行清理,在一定程度上可以释放磁盘空间,减轻磁盘空间压力。

🎈在经过调研之后发现,某服务项目每周产生的数据量已经达到千万级别,单日将近能产生两百万的数据量写入到 ES 数据库中,平均每个小时最少产生 10w+ 条数据,加上之前的历史数据,目前生产环境 ES 数据量已经达到两亿一千四百八十万的数据。并且随着当前业务量的爆发式增长,数据增长量急剧飙升,在未来一年内每周产生的数据量有望达到 3kw-5kw 左右。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

💡因此,对 ES 数据库中历史数据进行清理势在必行,为了能够释放磁盘空间,并且还要保证业务方能够进行日常问题的排查定位,决定从两个月前的数据开始清理,方案如下:

  • 编写定时任务,每天凌晨三点清理两个月前的那一天数据,之所以选择凌晨三点是因为那时候的 CPU 以及内存占用率较低。
  • 清理一天的数据时,根据时间段进行清理,每个小时清理一次,避免内存中存放太多的数据,导致内存溢出。
  • 清理 ES 数据时,需要先查询出数据,而 ES 默认最多只能查询 1w 条数据,如果当次需要删除的数据量超过 1w 条,普通的查询操作无法完全删除数据。因此,需要采用滚动查询的方式,滚动查询结果保持时间需要设置合理,不能太长,否则也可能会导致内存溢出。

根据以上的思路方案,设计的定时清理ES历史数据代码如下:

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.Date;/*** 清理ES历史数据定时任务*/
@Component
public class CleanESHistoryDataTask {private static final Logger LOGGER = LoggerFactory.getLogger(CleanESHistoryDataTask.class);@Resourceprivate RestHighLevelClient restHighLevelClient;/*** 根据索引名称删除当前日期两个月前的那一天的历史文档数据* @param jobContext*/@Scheduledpublic void cleanESHistoryData(JobContext jobContext) {// jobContext为定时任务中回传数据String indexName = jobContext.getData();if (StringUtils.isBlank(indexName)) {LOGGER.warn("ES索引名称不能为空!");return;}long startTimeMillis = System.currentTimeMillis();String twoMonthsAgoDate = DateTool.format(DateUtils.addMonths(new Date(), -1), DateTool.DF_DAY);try {String startTimeStr = twoMonthsAgoDate + " 00:00:00";// 初始化时间,形如2023-08-06 00:00:00Date initialStartTime = DateTool.parse(startTimeStr, DF_FULL);// 每次循环清理一个小时历史文档数据,循环24次清理完一天的历史文档数据for (int i = 0; i < 24; i++) {Date startTime = initialStartTime;startTime = DateUtils.addHours(startTime, i);Date endTime = DateUtils.addHours(startTime, 1);LOGGER.info("正在清理索引:[{}],时间:{} 至 {}的历史文档数据...", indexName, DateTool.format(startTime, DF_FULL), DateTool.format(endTime, DF_FULL));long currentStartTimeMillis = System.currentTimeMillis();// 指定操作的索引库SearchRequest searchRequest = new SearchRequest(indexName);// 构造查询条件,指定查询的时间范围,每次最多写入1000条数据至内存,减轻服务器内存压力SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(QueryBuilders.rangeQuery("createTimeStr.keyword").from(DateTool.format(startTime, DF_FULL)).to(DateTool.format(endTime, DF_FULL))).size(1000);// 设置滚动查询结果在内存中的过期时间为1minScroll scroll = new Scroll(TimeValue.timeValueMinutes(1L));// 将滚动以及构造的查询条件放入查询请求searchRequest.scroll(scroll).source(searchSourceBuilder);SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);// 记录要滚动的IDString scrollId = searchResponse.getScrollId();SearchHit[] hits = searchResponse.getHits().getHits();while (hits != null && hits.length > 0) {// 创建批量处理请求对象BulkRequest bulkRequest = new BulkRequest();for (SearchHit hit : hits) {DeleteRequest deleteRequest = new DeleteRequest(indexName, hit.getId());bulkRequest.add(deleteRequest);}// 执行批量删除请求操作restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);// 构造滚动查询条件,继续滚动查询SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);scrollRequest.scroll(scroll);searchResponse = restHighLevelClient.scroll(scrollRequest, RequestOptions.DEFAULT);scrollId = searchResponse.getScrollId();hits = searchResponse.getHits().getHits();}// 当前滚动查询结束,清除滚动,释放服务器内存资源ClearScrollRequest clearScrollRequest = new ClearScrollRequest();clearScrollRequest.addScrollId(scrollId);restHighLevelClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);LOGGER.info("清理索引:[{}],时间:{} 至 {}的历史文档数据成功,耗时{}ms", indexName, DateTool.format(startTime, DF_FULL), DateTool.format(endTime, DF_FULL), (System.currentTimeMillis() - currentStartTimeMillis));}LOGGER.info("[cleanESHistoryData] 定时任务-清理索引:[{}],时间:{}的历史文档数据成功,耗时{}ms", indexName, twoMonthsAgoDate, (System.currentTimeMillis() - startTimeMillis));} catch (Exception e) {LOGGER.error(String.format("[cleanESHistoryData] 定时任务-清理索引:[{}],时间:{}的历史文档数据失败,耗时{}ms", indexName, twoMonthsAgoDate, (System.currentTimeMillis() - startTimeMillis)), e);}}
}

其中,需要注意以下几点

  • 在 Java 中对 ES 进行操作,这里使用的是 ES 的高级客户端组件 RestHighLevelClient
  • @Scheduled 注解为自研定时任务工具注解,外界无法使用,在使用定时任务时需要自己选择合适的定时任务框架。
  • DateTool 工具类为自研工具类,外界同样无法使用,在以上代码段中就是用于对 java.util.Date 类型进行转换为字符串,DF_FULLDateTool.DF_DAY 均是常量,它们的值分别为 yyyy-MM-dd HH:mm:ssyyyy-MM-dd

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/55899.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

docker版jxTMS使用指南:使用jxTMS采集数据之一

本文讲解了如何jxTMS的数据采集与处理框架并介绍了如何用来采集数据&#xff0c;整个系列的文章请查看&#xff1a;docker版jxTMS使用指南&#xff1a;4.4版升级内容 docker版本的使用&#xff0c;请查看&#xff1a;docker版jxTMS使用指南 4.0版jxTMS的说明&#xff0c;请查…

QT信号和槽

QT信号和槽 为了分析代码方便&#xff0c;我们要给控件改名字。要通俗易懂。 信号:信号就是指控件发出的特定的信号 槽: 槽就是槽函数的意思&#xff0c;我们可以把槽函数绑定在某一个控件的信号上。类似于中断函数 自动关联&#xff0c;信号和槽函数 手动关联

深入理解缓存 TLB 原理

今天分享一篇TLB的好文章&#xff0c;希望大家夯实基本功&#xff0c;让我们一起深入理解计算机系统。 TLB 是 translation lookaside buffer 的简称。首先&#xff0c;我们知道 MMU 的作用是把虚拟地址转换成物理地址。 MMU工作原理 虚拟地址和物理地址的映射关系存储在页表…

企业如何实现自己的AI垂直大模型

文章目录 为什么要训练垂直大模型训练垂直大模型有许多潜在的好处训练垂直大模型也存在一些挑战 企业如何实现自己的AI垂直大模型1.确定需求2.收集数据3.准备数据4.训练模型5.评估模型6.部署模型 如何高效实现垂直大模型 ✍创作者&#xff1a;全栈弄潮儿 &#x1f3e1; 个人主页…

VMWare虚拟机设置固定ip

1、在主机上刷新并重新获取ip 1、输入以下命令以释放原先的 ip 地址 #windows命令 ipconfig /release #linux/macos命令 sudo dhclient -r2、再输入以下命令以重新获取 ip 地址 #windows命令 windows&#xff1a;ipconfig /renew #linux/macos命令 sudo dhclient2、设置虚拟…

在 Linux 上以 All-in-One 模式安装 KubeSphere

官方文档&#xff1a;https://www.kubesphere.io/zh/docs/v3.3/quick-start/all-in-one-on-linux/ 操作系统 最低配置 Ubuntu&#xff1a; 16.04,18.04, 20.04, 22.04 2 核 CPU&#xff0c;4 GB 内存&#xff0c;40 GB 磁盘空间Debian Buste&#xff1a;Stretch 2 核 CPU&am…

SpringBoot+SSM实战<一>:打造高效便捷的企业级Java外卖订购系统

文章目录 项目简介项目架构功能模块管理端用户端 技术选型用户层网关层应用层数据层工具 项目优缺点结语 黑马程序员最新Java项目实战《苍穹外卖》&#xff1a;让你轻松掌握SpringBootSSM的企业级开发技巧项目简介 《苍穹外卖》是一款为餐饮企业&#xff08;餐厅、饭店&#x…

jmeter工具测试和压测websocket协议【杭州多测师_王sir】

一、安装JDK配置好环境变量&#xff0c;安装好jmeter 二、下载WebSocketSampler发送请求用的&#xff0c;地址&#xff1a;https://bitbucket.org/pjtr/jmeter-websocket-samplers/downloads/?spma2c4g.11186623.2.15.363f211bH03KeI 下载解压后的jar包放到D:\JMeter\apache-j…

K8S系列文章之 Traefik快速入门

traefik 与 nginx 一样&#xff0c;是一款优秀的反向代理工具&#xff0c;或者叫 Edge Router。至于使用它的原因则基于以下几点 无须重启即可更新配置自动的服务发现与负载均衡与 docker 的完美集成&#xff0c;基于 container label 的配置漂亮的 dashboard 界面metrics 的支…

uniapp实现支付宝菜单展开与收起

需求实现支付宝类似的效果&#xff1a; 思路&#xff1a; 1.首先建立展开收起按钮&#xff0c;这里使用的是uview里面的icon图标。 2.其次建立展开菜单内容&#xff0c;这里只演示了文本信息&#xff0c;后期引入首页应用。 3.最后写js逻辑&#xff0c;展开收起时改变盒子高度和…

Harbor部署--使用 Harbor 安装包

一、Harbor安装准备条件 这里以 harbor 2.8.3 版本为例 1.1 硬件要求 Harbor 安装对硬件资源CPU、内存和硬盘的要求如下表&#xff1a; 资源 最小要求 推荐配置 CPU 2 CPU 4 CPU Mem 4 GB 8 GB Disk 40 GB 160 GB 使用如下命令分别查看服务器的物理CPU和逻辑CPU个数…

任务12、Quality指令加持,Midjourney生成电影级数码作品

12.1 任务概述 本次实验任务旨在帮助你掌握Midjourney AI绘画中的Quality指令。通过深入介绍Quality指令的概念和作用,我们将解释为什么它在绘画中至关重要。通过测试不同的Quality参数对绘画效果的影响,并提供实战演示,你将学会如何在Midjourney中设置Quality参数以达到更…