Java 中使用 ES 高级客户端库 RestHighLevelClient 清理百万级规模历史数据

🎉工作中遇到这样一个需求场景:由于ES数据库中历史数据过多,占用太多的磁盘空间,需要定期地进行清理,在一定程度上可以释放磁盘空间,减轻磁盘空间压力。

🎈在经过调研之后发现,某服务项目每周产生的数据量已经达到千万级别,单日将近能产生两百万的数据量写入到 ES 数据库中,平均每个小时最少产生 10w+ 条数据,加上之前的历史数据,目前生产环境 ES 数据量已经达到两亿一千四百八十万的数据。并且随着当前业务量的爆发式增长,数据增长量急剧飙升,在未来一年内每周产生的数据量有望达到 3kw-5kw 左右。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

💡因此,对 ES 数据库中历史数据进行清理势在必行,为了能够释放磁盘空间,并且还要保证业务方能够进行日常问题的排查定位,决定从两个月前的数据开始清理,方案如下:

  • 编写定时任务,每天凌晨三点清理两个月前的那一天数据,之所以选择凌晨三点是因为在 Grafana 查看了生产环境的集群监控情况,凌晨两点至四点之间的集群、索引的查询以及写入 QPS 都比较低。

在这里插入图片描述

  • 清理一天的数据时,根据时间段进行清理,每个小时清理一次,避免内存中存放太多的数据,导致内存溢出。
  • 清理 ES 数据时,需要先查询出数据,而 ES 默认最多只能查询 1w 条数据,如果当次需要删除的数据量超过 1w 条,普通的查询操作无法完全删除数据。因此,需要采用滚动查询的方式,滚动查询结果保持时间需要设置合理,不能太长,否则也可能会导致内存溢出。

根据以上的思路方案,设计的定时清理ES历史数据代码如下:

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Date;

/**

  • 清理ES历史数据定时任务
    */
    @Component
    public class CleanESHistoryDataTask {

    private static final Logger LOGGER = LoggerFactory.getLogger(CleanESHistoryDataTask.class);

    @Resource
    private RestHighLevelClient restHighLevelClient;

    /**

    • 根据索引名称删除当前日期两个月前的那一天的历史文档数据
    • @param jobContext
      */
      @Scheduled
      public void cleanESHistoryData(JobContext jobContext) {
      // jobContext为定时任务中回传数据
      String indexName = jobContext.getData();
      if (StringUtils.isBlank(indexName)) {
      LOGGER.warn(“ES索引名称不能为空!”);
      return;
      }
      long startTimeMillis = System.currentTimeMillis();
      String twoMonthsAgoDate = DateTool.format(DateUtils.addMonths(new Date(), -1), DateTool.DF_DAY);
      try {
      String startTimeStr = twoMonthsAgoDate + " 00:00:00";
      // 初始化时间,形如2023-08-06 00:00:00
      Date initialStartTime = DateTool.parse(startTimeStr, DF_FULL);
      // 每次循环清理一个小时历史文档数据,循环24次清理完一天的历史文档数据
      for (int i = 0; i < 24; i++) {
      Date startTime = initialStartTime;
      startTime = DateUtils.addHours(startTime, i);
      Date endTime = DateUtils.addHours(startTime, 1);
      LOGGER.info(“正在清理索引:[{}],时间:{} 至 {}的历史文档数据…”, indexName, DateTool.format(startTime, DF_FULL), DateTool.format(endTime, DF_FULL));
      long currentStartTimeMillis = System.currentTimeMillis();
      // 指定操作的索引库
      SearchRequest searchRequest = new SearchRequest(indexName);
      // 构造查询条件,指定查询的时间范围,每次最多写入1000条数据至内存,减轻服务器内存压力
      SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(QueryBuilders.rangeQuery(“createTimeStr.keyword”)
      .from(DateTool.format(startTime, DF_FULL))
      .to(DateTool.format(endTime, DF_FULL)))
      .size(1000);
      // 设置滚动查询结果在内存中的过期时间为1min
      Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L));
      // 将滚动以及构造的查询条件放入查询请求
      searchRequest.scroll(scroll).source(searchSourceBuilder);
      SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
      // 记录要滚动的ID
      String scrollId = searchResponse.getScrollId();
      SearchHit[] hits = searchResponse.getHits().getHits();
      while (hits != null && hits.length > 0) {
      // 创建批量处理请求对象
      BulkRequest bulkRequest = new BulkRequest();
      for (SearchHit hit : hits) {
      DeleteRequest deleteRequest = new DeleteRequest(indexName, hit.getId());
      bulkRequest.add(deleteRequest);
      }
      // 执行批量删除请求操作
      restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
      // 构造滚动查询条件,继续滚动查询
      SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
      scrollRequest.scroll(scroll);
      searchResponse = restHighLevelClient.scroll(scrollRequest, RequestOptions.DEFAULT);
      scrollId = searchResponse.getScrollId();
      hits = searchResponse.getHits().getHits();
      }
      // 当前滚动查询结束,清除滚动,释放服务器内存资源
      ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
      clearScrollRequest.addScrollId(scrollId);
      restHighLevelClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
      LOGGER.info(“清理索引:[{}],时间:{} 至 {}的历史文档数据成功,耗时{}ms”, indexName, DateTool.format(startTime, DF_FULL), DateTool.format(endTime, DF_FULL), (System.currentTimeMillis() - currentStartTimeMillis));
      }
      LOGGER.info(“[cleanESHistoryData] 定时任务-清理索引:[{}],时间:{}的历史文档数据成功,耗时{}ms”, indexName, twoMonthsAgoDate, (System.currentTimeMillis() - startTimeMillis));
      } catch (Exception e) {
      LOGGER.error(String.format(“[cleanESHistoryData] 定时任务-清理索引:[{}],时间:{}的历史文档数据失败,耗时{}ms”, indexName, twoMonthsAgoDate, (System.currentTimeMillis() - startTimeMillis)), e);
      }
      }
      }

其中,需要注意以下几点

  • 在 Java 中对 ES 进行操作,这里使用的是 ES 的高级客户端组件 RestHighLevelClient
  • @Scheduled 注解为自研定时任务工具注解,外界无法使用,在使用定时任务时需要自己选择合适的定时任务框架。
  • DateTool 工具类为自研工具类,外界同样无法使用,在以上代码段中就是用于对 java.util.Date 类型进行转换为字符串,DF_FULLDateTool.DF_DAY 均是常量,它们的值分别为 yyyy-MM-dd HH:mm:ssyyyy-MM-dd

在这里插入图片描述

🎈通过观察监控可以发现,在凌晨三点执行定时任务清理 ES 历史数据期间,集群、索引查询 QPS 以及 CPU 利用率指标都明显飙升。因此,清理 ES 数据时一定要避开流量高峰期,避免在流量高峰期清理数据时造成资源实例宕机,造成生产事故。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/84009.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【操作系统原理】计算机系统概述

计算机系统概述 操作系统运行环境 用户程序执行____指令发起系统调用&#xff0c;请求操作系统提供服务&#xff0c;这一过程中系统通过____机制从用户态进入核心态。 【答&#xff1a;访管指令(trap)指令&#xff0c;硬件中断】 访管指令是在用户态使用的&#xff0c;并不是…

JavaScript——为什么静态方法不能调用非静态方法

个人简介 &#x1f440;个人主页&#xff1a; 前端杂货铺 &#x1f64b;‍♂️学习方向&#xff1a; 主攻前端方向&#xff0c;正逐渐往全干发展 &#x1f4c3;个人状态&#xff1a; 研发工程师&#xff0c;现效力于中国工业软件事业 &#x1f680;人生格言&#xff1a; 积跬步…

TensorFlow中slim包的具体用法

TensorFlow中slim包的具体用法 1、训练脚本文件&#xff08;该文件包含数据下载打包、模型训练&#xff0c;模型评估流程&#xff09;3、模型训练1、数据集相关模块&#xff1a;2、设置网络模型模块3、数据预处理模块4、定义损失loss5、定义优化器模块 本次使用的TensorFlow版本…

MathType7MAC中文版数学公式编辑器下载安装教程

如今许多之前需要手写的内容都可以在计算机中完成了。以前我们可以通过word输入一些简单的数学公式&#xff0c;但现在通过数学公式编辑器便可以完成几乎所有数学公式的写作。许多简单的数学公式&#xff0c;我们可以使用输入法一个个找到特殊符号并输入&#xff0c;但是对于高…

LeetCode--HOT100题(42)

目录 题目描述&#xff1a;108. 将有序数组转换为二叉搜索树&#xff08;简单&#xff09;题目接口解题思路代码 PS: 题目描述&#xff1a;108. 将有序数组转换为二叉搜索树&#xff08;简单&#xff09; 给你一个整数数组 nums &#xff0c;其中元素已经按 升序 排列&#xf…

单片机学习-蜂鸣器电子元件

蜂鸣器是有什么作用的&#xff1f; 蜂鸣器 是 一种 一体化结构 的电子训响器&#xff0c;可以发出声音的电子元器件 蜂鸣器分类&#xff1f; ①压电式蜂鸣器&#xff08;图左&#xff09; 称&#xff1a; 无源蜂鸣器 ②电磁式蜂鸣器&#xff08;图右&#xff09; 称&#xf…

Android平台RTMP|RTSP直播播放器功能进阶探讨

我们需要怎样的直播播放器&#xff1f; 很多开发者在跟我聊天的时候&#xff0c;经常问我&#xff0c;为什么一个RTMP或RTSP播放器&#xff0c;你们需要设计那么多的接口&#xff0c;真的有必要吗&#xff1f;带着这样的疑惑&#xff0c;我们今天聊聊Android平台RTMP、RTSP播放…

MAVEN利器:一文带你了解IDEA中如何使用Maven

前言&#xff1a; 强大的构建工具——Maven。作为Java生态系统中的重要组成部分&#xff0c;Maven为开发人员提供了一种简单而高效的方式来构建、管理和发布Java项目。无论是小型项目还是大型企业级应用&#xff0c;Maven都能帮助开发人员轻松处理依赖管理、编译、测试和部署等…

17.4 【Linux】systemctl 针对 timer 的配置文件

有时候&#xff0c;某些服务你想要定期执行&#xff0c;或者是开机后执行&#xff0c;或者是什么服务启动多久后执行等等的。在过去&#xff0c;我们大概都是使用 crond 这个服务来定期处理&#xff0c; 不过&#xff0c;既然现在有一直常驻在内存当中的 systemd 这个好用的东西…

neo4jd3拓扑节点显示为节点标签(自定义节点显示)

需求描述&#xff1a;如下图所示&#xff0c;我的拓扑图中有需要不同类型的标签节点&#xff0c;我希望每个节点中显示的是节点的标签 在官方示例中&#xff0c;我们可以看到&#xff0c;节点里面是可以显示图标的&#xff0c;现在我们想将下面的图标换成我们自定义的内容 那…

基于Python3 的 简单股票 可转债 提醒逻辑

概述 通过本地的定时轮训&#xff0c;结合本地建议数据库。检查股票可转债价格的同事&#xff0c;进行策略化提醒 详细 前言 为什么会有这么个东西出来呢&#xff0c;主要是因为炒股软件虽然有推送&#xff0c;但是设置了价格之后&#xff0c;看到推送也未必那么及时&#…

SpringCloud学习笔记(四)_ZooKeeper注册中心

基于Spring Cloud实现服务的发布与调用。而在18年7月份&#xff0c;Eureka2.0宣布停更了&#xff0c;将不再进行开发&#xff0c;所以对于公司技术选型来说&#xff0c;可能会换用其他方案做注册中心。本章学习便是使用ZooKeeper作为注册中心。 本章使用的zookeeper版本是 3.6…