复制代码

为懒人提供无限可能,生命不息,code不止

人类感性的情绪,让我们知难行难
我思故我在
日拱一卒,功不唐捐
  • 首页
  • 前端
  • 后台
  • 数据库
  • 运维
  • 资源下载
  • 实用工具
  • 接口文档工具
  • 登录
  • 注册

elasticsearch高级客户端

【原创】elasticsearch scroll滚动查询-适合下载数据与内存分页场景

作者: whooyun发表于: 2023-09-14 11:34

Scroll查询:
使用场景:scroll查询主要用于处理大数据集,通常需要跨越大量文档并保留一个快照以供后续处理。
使用方式:您需要首先执行初始查询,然后使用滚动ID(scroll ID)来获取下一批数据,直到没有更多数据为止。
资源开销:scroll查询需要维护滚动上下文,因此会占用一定的资源,包括内存和CPU。滚动上下文通常会在一定时间后过期,需要重新创建。
排序:scroll查询支持基于文档ID的排序。

步骤:
1、通过scroll将所有有数据滚动查询后保存到List中(最好设置一个最大值,防止内存溢出)
2、通过每页多少条数切割List
    //构建query
    private BoolQueryBuilder createQueyBuilder(GoodsContrastConditionDto dto, CommonSortDto sortDto) {
        BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
        boolQuery.must(QueryBuilders.termQuery("corp_id", dto.getCorpId()));
        if (!CollectionUtils.isEmpty(dto.getGoodsIdList())) {
            boolQuery.must(QueryBuilders.termsQuery("goods_id", dto.getGoodsIdList()));
        }
        if (!CollectionUtils.isEmpty(dto.getOrgIdList())) {
            boolQuery.must(QueryBuilders.termsQuery("own_org_id", dto.getOrgIdList()));
        }

        if (!CollectionUtils.isEmpty(dto.getGoodsCatIds())) {
            boolQuery.must(QueryBuilders.termsQuery("goods_cat_id", dto.getGoodsCatIds()));
        }

        if (!StringUtils.isEmpty(dto.getOrgId())) {
            boolQuery.must(QueryBuilders.termQuery("own_org_id", dto.getOrgId()));
        }
        if (!StringUtils.isEmpty(dto.getGoodsCatId())) {
            boolQuery.must(QueryBuilders.termQuery("goods_cat_id", dto.getGoodsCatId()));
        }

        if (!StringUtils.isEmpty(dto.getBizDtmIs())) {
            //bizDtmIs 1业务日期,2入账日期
            //业务日期数据,不需要用biz_dtm_is字段过滤数据
            if (dto.getBizDtmIs().equals(1)) {

            } else {
                //bizDtmIs 1业务日期,2入账日期
                //入账日期数据需要用biz_dtm_is字段过滤数据
                boolQuery.mustNot(QueryBuilders.termQuery("biz_dtm_is", 1));
            }
            boolQuery.must(QueryBuilders.rangeQuery("stat_dtm")
                    .gte(DateUtil.format(LocalDateTimeUtil.parse(dto.getEndDtm()), "yyyy-MM-dd HH:mm:ss"))
                    .lte(DateUtil.format(LocalDateTimeUtil.parse(dto.getStartDtm()), "yyyy-MM-dd HH:mm:ss")));
        }
        return boolQuery;
    }
	
	/**
     * 查询结果处理
     *
     * @param resultList
     * @param searchResponse
     */
    private void handHits(List<ESAdsAllGdsOrgAmt1d> resultList, SearchResponse searchResponse) {
        for (SearchHit hit : searchResponse.getHits().getHits()) {
            // 获取文档的数据
            Map<String, Object> sourceAsMap = hit.getSourceAsMap();
            // 使用json将sourceAsMap转换为UserInfo对象
            ESAdsAllGdsOrgAmt1d gds = JSON.parseObject(JSON.toJSONString(sourceAsMap), ESAdsAllGdsOrgAmt1d.class);
            resultList.add(gds);
        }
    }
	
	
    @Override
    public List<ESAdsAllGdsOrgAmt1d> goodsIdList(GoodsContrastConditionDto dto) {
        List<ESAdsAllGdsOrgAmt1d> resultList = new ArrayList();
        try {
            SearchRequest searchRequest = new SearchRequest("ads_all_gds_org_amt_1d");
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            // 构建查询
            // 使用 constant_score 查询,忽略 _score
            searchSourceBuilder.query(QueryBuilders.constantScoreQuery(this.createQueryBuilder(dto, null)));

            searchRequest.source(searchSourceBuilder);
            searchRequest.scroll(TimeValue.timeValueMinutes(1));

            // 执行初始查询
            SearchResponse firstSearchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
            // 处理初始查询结果
            String scrollId = firstSearchResponse.getScrollId();
            this.handHits(resultList, firstSearchResponse);

            //处理滚动查询结果 begin
            int count = 1;
            //给一个最大值,防止死循环,撑爆内存
            while (count <= 100000) {
                // 使用滚动ID来获取下一批结果
                SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
                scrollRequest.scroll(TimeValue.timeValueMinutes(1));

                SearchResponse scrollResponse = client.scroll(scrollRequest, RequestOptions.DEFAULT);

                // 处理滚动查询结果
                // 您可以在这里访问和处理批量的文档数据
                this.handHits(resultList, scrollResponse);

                count++;
                // 判断是否已经滚动完成
                if (scrollResponse.getHits().getHits().length == 0) {
                    break;
                }

               
                scrollId = scrollResponse.getScrollId();
            }
 // 完成滚动查询后,清除滚动上下文  scrollId 滚动上下文默认1分钟,就算清理不成功也会自动过期
                ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
                clearScrollRequest.addScrollId(scrollId);
                ClearScrollResponse clearScrollResponse = client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
                // 处理清除滚动上下文的响应

        } catch (IOException e) {
            e.printStackTrace();
        }
        return resultList;
    }