Java elasticsearch scroll模板实现

一、scroll说明和使用场景

scroll的使用场景:大数据量的检索和操作

scroll顾名思义,就是游标的意思,核心的应用场景就是遍历 elasticsearch中的数据;

通常我们遍历数据采用的是分页,elastcisearch还支持from size的方式进行分页查询,使用 from and size 的深度分页,比如说 ?size=10&from=10000,因为 100,000 排序的结果必须从每个分片上取出并重新排序最后返回 10 条。这个过程需要对每个请求页重新进行提取+排序,效率很低,消耗很大,所以默认的最大可分页的数据是10000,超过10000是不建议的;

使用

通过在url末尾带上scroll=1m表示开启一个游标,1m表示游标的有效期为1分钟

POST /record/_search?scroll=1m
{
  "from"0,
  "size"20
}

返回结果中会把scroll的id带上,再次查询的时候,直接用scroll id查询即可

alt
POST /_search/scroll
{
    "scroll" : "1m"
    "scroll_id" : "FGluY2x1ZGVfY29udGV4dF91dWlkDnF1ZXJ5VGhlbkZldGNoAhZuYmpMbVpwWFRUMnNFMUFFSHlSMHB3AAAAAALBy_0WUWxrNTRTaWNUcy1sOHQ0VUo5dzF6dxZoemFkZTlMeFQ4MmoyOW5SUG8ybE53AAAAAAN6ip8WMmk5TWZlQ21RQnFsNURwaXRzSGhCdw==" 
}

二、基于ElasticsearchRestTemplate的实现

这里我们定义了一个template如下,主要作用就是实现一个基于scroll的数据遍历模板,屏蔽开启scroll 以及 scroll遍历所有数据,通过Consumer<T>钩子函数进行数据处理

import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.data.elasticsearch.core.SearchHit;
import org.springframework.data.elasticsearch.core.SearchScrollHits;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;

import java.util.List;
import java.util.concurrent.*;

/**
 * scrollTemplate 模板,用于遍历整个Index的数据
 * @author xiuzhu
 * @Date 2023/7/28 13:12
 */

@Slf4j
public class ElasticSearchScrollTemplate<T{

    ExecutorService executorService = new ThreadPoolExecutor(14,
                                      30,TimeUnit.SECONDS,
                                      new LinkedBlockingQueue<Runnable>(5),
                                        Executors.defaultThreadFactory(),
                                        new ThreadPoolExecutor.CallerRunsPolicy()
                                    );

    ElasticsearchRestTemplate elasticSearchRestTemplate;

    Class<T> cls;

    String indexName;

    public ElasticSearchScrollTemplate(
            ElasticsearchRestTemplate template,
            Class<T> cls,
            String indexName
    )
 
{
        this.elasticSearchRestTemplate = template;
        this.cls = cls;
        this.indexName = indexName;
    }

    @FunctionalInterface
    public interface Consumer<T{
        public void accept(List<T> objects);
    }

    public void execute(Consumer<T> consumer) {
        //构建查询条件
        NativeSearchQueryBuilder query = new NativeSearchQueryBuilder();
        BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();

        query.withPageable(PageRequest.of(0300));
        query.withQuery(queryBuilder);

        //保留0.5分钟
        long scrollTimeInMillis = 30*1000;

        IndexCoordinates recordIndex = IndexCoordinates.of(indexName);
        SearchScrollHits<T> hits = elasticSearchRestTemplate.searchScrollStart(scrollTimeInMillis, query.build(), cls, recordIndex);

        // scrollId
        String scrollId = hits.getScrollId();
        List<T> recordEntityList = hits.stream().map(SearchHit::getContent).toList();
        long total = 0L;

        log.info("================ began scroll index={} ====================", indexName);

        executorService.submit(()->{
            consumer.accept(recordEntityList);
        });

        total = total + recordEntityList.size();

        log.info("================  has scroll index={} total={} ====================", indexName, total);
        while (!hits.isEmpty()) {
            hits = elasticSearchRestTemplate.searchScrollContinue(scrollId, scrollTimeInMillis, cls, recordIndex);
            List<T> entities = hits.stream().map(SearchHit::getContent).toList();

            executorService.submit(()->{
                consumer.accept(entities);
            });

            total = total + entities.size();
            try {
                //给系统留GC时间,不然容易内存溢出
                Thread.sleep(300);
            } catch (InterruptedException e) {
                log.error("sleep error", e);
            }
            log.info("================  has scroll index={} total={} ====================", indexName, total);
        }
        log.info("================ end scroll index={} ====================", indexName);
    }
}

使用参考:

@Resource(name = "elasticSearchRestTemplate")
    ElasticsearchRestTemplate elasticsearchRestTemplate;

new ElasticSearchScrollTemplate<>(
                        elasticsearchRestTemplate,
                        RecordEntity.class,
                        "record")
                ).execute((entities)->
{
                    entities.forEach(item->{
                        //这里进行数据的处理,比如修改数据
                        recordEntityService.save(item);
                        log.info("tag update success record={} api={}", item.getId());

                    });
                });

本文由 mdnice 多平台发布

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/97537.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【SpringMVC】工作流程及入门案例

目录 前言 回顾MVC三层架构 1. SpringMVC简介 …

linux和docker下mysql安装

目录 一、linux下mysql的安装 1.进入到/etc/yum.repos.d 2.编辑vim mysql-community.repo 3.编辑以下内容 4.保存退出&#xff0c;更新缓存yum makecache 5.下载mysql 6.启动并查看mysql状态 7.查找mysql密码 8.登陆mysql 9.密码修改参考MySQL密码修改 二、docker安…

系统错误码指示确立+日志模块手动配置

1&#xff0c;系统错误码指示确立 对于前后端分离的系统设计中&#xff0c;后端建立错误码指示对于前端非常重要可以指示错误存在地方&#xff1b;以用户注册为例&#xff1b; public interface SystemCode{int SYSTEM_USER_ERROR_ADD_FAIL 10000;int SYSTEM_USER_INFO_ADD …

Springboot + Sqlite实战(离线部署成功)

最近有个需求&#xff0c;是手机软件离线使用&#xff0c; 用的springboot mybatis-plus mysql&#xff0c;无法实现&#xff0c;于是考虑使用内嵌式轻量级的数据库SQLlite 引入依赖 <dependency><groupId>org.xerial</groupId><artifactId>sqlite-…

Logback日志记录只在控制台输出sql,未写入日志文件【解决】

原因&#xff1a;持久层框架对于Log接口实现方式不一样&#xff0c;日记记录的位置及展示方式也也不一样 mybatis-plus:configuration:log-impl: org.apache.ibatis.logging.stdout.StdOutImpl # sql只会打印到控制台不会输出到日志文件种mybatis-plus:configuration:log-impl…

第二张微服务的调用与注册

文章目录 工程导入利用RestTemplate调用服务需求创建RestTemplate的实例到Spring容器使用RestTemplate发送请求消费者和提供者 Eureka注册中心服务远程调用会出现的问题Eureka的结构和作用Eureka的配置过程搭建注册中心服务注册服务发现 Ribbon负载均衡负载均衡原理源码跟踪总结…

iOS实时监控与报警器

在现代信息化社会中&#xff0c;即使我们不在电脑前面也能随时获取到最新的数据。而苹果公司提供的iOS推送通知功能为我们带来了一种全新的方式——通过手机接收实时监控和报警信息。 首先让我们了解一下iOS推送通知。它是一个强大且灵活可定制化程度高、适用于各类应用场景&a…

【LeetCode算法系列题解】第51~55题

CONTENTS LeetCode 51. N 皇后&#xff08;困难&#xff09;LeetCode 52. N 皇后 II&#xff08;困难&#xff09;LeetCode 53. 最大子序和&#xff08;中等&#xff09;LeetCode 54. 螺旋矩阵&#xff08;中等&#xff09;LeetCode 55. 跳跃游戏&#xff08;中等&#xff09; …

SpringMVC基础入门及工作流程---全方面详细介绍

一&#xff0c;SpringMVC概念 Spring MVC是一个基于Java的实现了MVC设计模式的请求驱动类型的轻量级Web框架&#xff0c;通过把Model&#xff0c;View&#xff0c;Controller分离&#xff0c;将web层进行职责解耦&#xff0c;把复杂的web应用分成逻辑清晰的几部分&#xff0c;简…

在k8s中用label控制Pod部署到指定的node上

案例-标注k8s-node1是配置了SSD的节点 kubectl label node k8s-node1 disktypessd 查看标记 测试 将pod部署到disktypessd的节点上&#xff08;这里设置了k8s-node1为ssd&#xff09; 部署后查看结果-副本全都运行在了k8s-node1上—符合预期 删除标记 kubectl label node k8…

使用 Sealos 在离线环境中光速安装 K8s 集群

作者&#xff1a;尹珉。Sealos 开源社区 Ambassador&#xff0c;云原生爱好者。 当容器化交付遇上离线环境 在当今快节奏的软件交付环境中&#xff0c;容器化交付已经成为许多企业选择的首选技术手段。在可以访问公网的环境下&#xff0c;容器化交付不仅能够提高软件开发和交付…

idea 无法识别maven的解决

问题描述 从git拉取代码或者修改文件夹以后&#xff0c;整个项目所有依赖爆红无法通过修改或者重新加载maven解决版本为idea 2021 问题定位 maven的版本太高&#xff0c;而idea的般本太低&#xff0c;导致识别的时候稳定性差 解决 使用idea原生的maven版本 选择已捆绑的m…