草庐IT

查es大于10000条数据-滚动查询(scroll)

卡卡东~ 2023-08-19 原文

查es大于10000条数据-滚动查询(scroll)

背景

总所周知,es一般查询只支持最多查询出前1w条数据,很难受。想要一次性查询出你想要的数据,一些大数据的场景下,我们需要用到ElasicSearch的两种查询方式:深度分页或者滚动查询,我们今天使用的是滚动查询方式,因为需要一批次加载全部使用的数据。

介绍

深度分页

使用from和size来查询,操作比较简单,如下:

{
    "query": {
        "match_all": {}
    },
    "from": 9990,
    "size": 10
}

{
    "query": {
        "match_all": {}
    },
    "from": 9999,
    "size": 10
}

我们在获取第9999条到10009条数据的时候,每次需要将前9990、9999条都查出来,然后再向下寻找后10条。如果es还有分片存在,加载的数量就是9990*分片数量,这样查询到以后,还要排序处理,得到10条数据。。。如此一来,搜索得太深,就会造成性能问题,会耗费内存和占用cpu。

其实我们应该避免深度分页操作(限制分页页数),比如最多只能提供100页的展示,从第101页开始就没了,毕竟用户也不会搜的那么深,我们平时搜索淘宝或者百度,一般也就看个10来页就顶多了。

譬如淘宝搜索限制分页最多100页,如下:

滚动查询

通过上面可以指定,from-size不适合做离线大数据的场景,因此我们使用es提供的另一种查询大量数据的方式——滚动查询,也叫游标查询:
json处理如下:

#第一次查询:
GET /sms/_search?scroll=5m
{
  "size": 20,
  "query": {
    "bool": {
      "must": [
        {
          "match": {
            "userId": "9d995c0b90fe4128896a1a84eca213bf"
          }
        }
      ]
    }
  }
}
返回结果:
{
  "_scroll_id": "DnF1ZXJ5VGhlbkZldGNoBgAAAAAATJH1FlFTYzlSZ0VNVGdlM2o0T0dTX2tVUncAAAAAAE0-zBZQUVp6Sy04X1J1NjJCaVZfQUhHWjFnAAAAAABMkfYWUVNjOVJnRU1UZ2UzajRPR1Nfa1VSdwAAAAAATXVxFk83UWRhNGg3UmxTQnpXTEUzd0dreXcAAAAAAEyR9xZRU2M5UmdFTVRnZTNqNE9HU19rVVJ3AAAAAABNPs0WUFFaekstOF9SdTYyQmlWX0FIR1oxZw==",
  "took": 6,
      ......
}
把上一次得到的_scroll_id拿到按以下查询即可得到下一轮的数据:
GET /_search/scroll/
{
  "scroll":"1m",
  "scroll_id":"DnF1ZXJ5VGhlbkZldGNoBgAAAAAATJH1FlFTYzlSZ0VNVGdlM2o0T0dTX2tVUncAAAAAAE0-zBZQUVp6Sy04X1J1NjJCaVZfQUhHWjFnAAAAAABMkfYWUVNjOVJnRU1UZ2UzajRPR1Nfa1VSdwAAAAAATXVxFk83UWRhNGg3UmxTQnpXTEUzd0dreXcAAAAAAEyR9xZRU2M5UmdFTVRnZTNqNE9HU19rVVJ3AAAAAABNPs0WUFFaekstOF9SdTYyQmlWX0FIR1oxZw=="
}
这样直到把数据查完为止。

使用json查询只是做一个简单的理解,我们真正用到的还是使用语言去操作它,RestHignLevelClient就是一个很实用的es客户端,接下来我们使用java对其进行操作:导入的pom如下,需要与es版本对应:

		<dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>7.0.0</version>
        </dependency>

直接上一个对api调用的工具类吧:❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️
其中,获得SearchResponse使用了滚动查询:具体是由以下几个模块组成的:

1、构建searchRequest

Scroll scroll = new Scroll(TimeValue.timeValueMillis(SCROLL_TIMEOUT));

        //构建searchRequest
        SearchRequest request = new SearchRequest(indices);
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        if (includes != null) {
            //构造器加入需要查找的字段
            sourceBuilder.fetchSource(includes, null);
        }
        //加入query语句
        sourceBuilder.query(query);
        //每次滚动的长度
        sourceBuilder.size(SIZE);
        //加入排序字段
        if (orderField != null && !"".equals(orderField.trim())) {
            sourceBuilder.sort(orderField, order);
        }
        //加入scroll和构造器
        request.scroll(scroll);
        request.source(sourceBuilder);

获取返回结果:

SearchResponse searchResponse = client.search(request, RequestOptions.DEFAULT);
        //拿到第一个ScrollId(游标)
        String scrollId = searchResponse.getScrollId();
        //拿到hits结果
        SearchHit[] hits = searchResponse.getHits().getHits();
        //保存返回结果List
        List<T> result = new ArrayList<>();
        scrollIdList.add(scrollId);

循环滚动查询—>保存结果:

//滚动查询将SearchHit封装到result中
            while (ArrayUtils.isNotEmpty(hits)) {
                for (SearchHit hit : hits) {
                    //Function<SearchHit, T>, 输入SearchHit,经过操作后,返回T结果
                    result.add(fun.apply(hit));
                }
                //说明滚动完了,返回结果即可
                if (hits.length < SIZE) {
                    break;
                }
                //继续滚动,根据上一个游标,得到这次开始查询位置
                SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId);
                searchScrollRequest.scroll(scroll);
                //得到结果
                SearchResponse searchScrollResponse = client.scroll(searchScrollRequest, RequestOptions.DEFAULT);
                //定位游标
                scrollId = searchScrollResponse.getScrollId();
                hits = searchScrollResponse.getHits().getHits();
                scrollIdList.add(scrollId);

util全部代码如下:

package com.yq.demo.Util;

import org.apache.commons.lang3.ArrayUtils;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;

public class ESUtil {
    private static final Logger log = LoggerFactory.getLogger(ESUtil.class);

    private static final long SCROLL_TIMEOUT = 180000;

    private static int SIZE = 1000;

    private static int MAX_BUFFER = 209715200;

    /**
     * 构建SearchResponse
     *
     * @param client     restHighLevelClient
     * @param indices    索引
     * @param query      queryBuilder
     * @param includes   包含的字段
     * @param orderField 排序字段
     * @param order      排序类型
     * @param fun        返回函数
     * @param <T>        返回类型
     * @return List, 可以使用fun转换为T结果
     * @throws Exception
     */
    public static <T> List<T> searchResponse(RestHighLevelClient client, String[] indices, QueryBuilder query, String[] includes, String orderField, SortOrder order, Function<SearchHit, T> fun) throws Exception {
        //滚动查询的Scroll
        Scroll scroll = new Scroll(TimeValue.timeValueMillis(SCROLL_TIMEOUT));

        //构建searchRequest
        SearchRequest request = new SearchRequest(indices);
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        if (includes != null) {
            //构造器加入需要查找的字段
            sourceBuilder.fetchSource(includes, null);
        }
        //加入query语句
        sourceBuilder.query(query);
        //每次滚动的长度
        sourceBuilder.size(SIZE);
        //加入排序字段
        if (orderField != null && !"".equals(orderField.trim())) {
            sourceBuilder.sort(orderField, order);
        }
        //加入scroll和构造器
        request.scroll(scroll);
        request.source(sourceBuilder);
        //存储scroll的list
        List<String> scrollIdList = new ArrayList<>();
        //返回结果
        SearchResponse searchResponse = client.search(request, RequestOptions.DEFAULT);
        //拿到第一个ScrollId(游标)
        String scrollId = searchResponse.getScrollId();
        //拿到hits结果
        SearchHit[] hits = searchResponse.getHits().getHits();
        //保存返回结果List
        List<T> result = new ArrayList<>();
        scrollIdList.add(scrollId);

        try {
            //滚动查询将SearchHit封装到result中
            while (ArrayUtils.isNotEmpty(hits)) {
                for (SearchHit hit : hits) {
                    //Function<SearchHit, T>, 输入SearchHit,经过操作后,返回T结果
                    result.add(fun.apply(hit));
                }
                //说明滚动完了,返回结果即可
                if (hits.length < SIZE) {
                    break;
                }
                //继续滚动,根据上一个游标,得到这次开始查询位置
                SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId);
                searchScrollRequest.scroll(scroll);
                //得到结果
                SearchResponse searchScrollResponse = client.scroll(searchScrollRequest, RequestOptions.DEFAULT);
                //定位游标
                scrollId = searchScrollResponse.getScrollId();
                hits = searchScrollResponse.getHits().getHits();
                scrollIdList.add(scrollId);
            }
        } finally {
            //清理scroll,释放资源
            ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
            clearScrollRequest.setScrollIds(scrollIdList);
            client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
        }
        return result;
    }

    /**
     * 聚合查询的SearchResponse
     * @param client
     * @param indices        索引
     * @param query QueryBuilder
     * @param aggregations  AggregationBuilder
     * @return SearchResponse
     * @throws Exception
     */

    public static SearchResponse searchResponse(RestHighLevelClient client, String[] indices, QueryBuilder query, AggregationBuilder... aggregations) throws Exception {
        //构建request请求
        SearchRequest request = new SearchRequest(indices);
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.query(query);
        //加入Agg
        if (aggregations != null && aggregations.length > 0) {
            for (AggregationBuilder aggregation : aggregations) {
                sourceBuilder.aggregation(aggregation);
            }
        }
        sourceBuilder.size(0);
        //忽略不可用索引,只用于开放索引
        request.indicesOptions(IndicesOptions.lenientExpandOpen());
        request.source(sourceBuilder);
        return client.search(request, RequestOptions.DEFAULT);
    }
}

希望对你有所帮助,Thank you for whatching!!!😆😆😆😆😆😆

有关查es大于10000条数据-滚动查询(scroll)的更多相关文章

  1. ruby - ECONNRESET (Whois::ConnectionError) - 尝试在 Ruby 中查询 Whois 时出错 - 2

    我正在用Ruby编写一个简单的程序来检查域列表是否被占用。基本上它循环遍历列表,并使用以下函数进行检查。require'rubygems'require'whois'defcheck_domain(domain)c=Whois::Client.newc.query("google.com").available?end程序不断出错(即使我在google.com中进行硬编码),并打印以下消息。鉴于该程序非常简单,我已经没有什么想法了-有什么建议吗?/Library/Ruby/Gems/1.8/gems/whois-2.0.2/lib/whois/server/adapters/base.

  2. ruby-on-rails - 在 Rails 和 ActiveRecord 中查询时忽略某些字段 - 2

    我知道我可以指定某些字段来使用pluck查询数据库。ids=Item.where('due_at但是我想知道,是否有一种方法可以指定我想避免从数据库查询的某些字段。某种反拔?posts=Post.where(published:true).do_not_lookup(:enormous_field) 最佳答案 Model#attribute_names应该返回列/属性数组。您可以排除其中一些并传递给pluck或select方法。像这样:posts=Post.where(published:true).select(Post.attr

  3. 使用canal同步MySQL数据到ES - 2

    文章目录一、概述简介原理模块二、配置Mysql使用版本环境要求1.操作系统2.mysql要求三、配置canal-server离线下载在线下载上传解压修改配置单机配置集群配置分库分表配置1.修改全局配置2.实例配置垂直分库水平分库3.修改group-instance.xml4.启动监听四、配置canal-adapter1修改启动配置2配置映射文件3启动ES数据同步查询所有订阅同步数据同步开关启动4.验证五、配置canal-admin一、概述简介canal是Alibaba旗下的一款开源项目,Java开发。基于数据库增量日志解析,提供增量数据订阅&消费。Git地址:https://github.co

  4. ES基础入门 - 2

    ES一、简介1、ElasticStackES技术栈:ElasticSearch:存数据+搜索;QL;Kibana:Web可视化平台,分析。LogStash:日志收集,Log4j:产生日志;log.info(xxx)。。。。使用场景:metrics:指标监控…2、基本概念Index(索引)动词:保存(插入)名词:类似MySQL数据库,给数据Type(类型)已废弃,以前类似MySQL的表现在用索引对数据分类Document(文档)真正要保存的一个JSON数据{name:"tcx"}二、入门实战{"name":"DESKTOP-1TSVGKG","cluster_name":"elasticsear

  5. sql - 查询忽略时间戳日期的时间范围 - 2

    我正在尝试查询我的Rails数据库(Postgres)中的购买表,我想查询时间范围。例如,我想知道在所有日期的下午2点到3点之间进行了多少次购买。此表中有一个created_at列,但我不知道如何在不搜索特定日期的情况下完成此操作。我试过:Purchases.where("created_atBETWEEN?and?",Time.now-1.hour,Time.now)但这最终只会搜索今天与那些时间的日期。 最佳答案 您需要使用PostgreSQL'sdate_part/extractfunction从created_at中提取小时

  6. ruby-on-rails - 如何将大于 5GB 的文件上传到 Amazon S3? - 2

    我目前正在使用带有Carrierwavegem的Rails3.2将文件上传到AmazonS3。现在我需要能够处理用户提交的大于5GB的文件,同时仍然使用Carrierwavegem。Carrierwave或Fog是否有任何其他gem或分支可以处理5GB以上的文件上传到S3?编辑:我不想重写一个完整的Rails上传解决方案,所以像这样的链接没有帮助:https://gist.github.com/908875. 最佳答案 我想出了如何做到这一点,并且现在可以正常工作了。在正确的config/environment文件中,添加以下内容以

  7. ruby-on-rails - solr 清理查询 - 2

    我在Rails上使用带有ruby​​的solr。一切正常,我只需要知道是否有任何现有代码来清理用户输入,比如以?开头的查询。或* 最佳答案 我不知道执行此操作的任何代码,但理论上可以通过查看parsingcodeinLucene来完成并搜索thrownewParseException(只有16个匹配!)。在实践中,我认为您最好只捕获代码中的任何solr异常并显示“无效查询”消息或类似信息。编辑:这里有几个“sanitizer”:http://pivotallabs.com/users/zach/blog/articles/937-s

  8. ruby-on-rails - Rails 3 在一个查询中包含多个表 - 2

    我正在为锦标赛开发一个Rails应用程序。我在这个查询中使用了三个模型:classPlayertruehas_and_belongs_to_many:tournamentsclassTournament:destroyclassPlayerMatch"Player",:foreign_key=>"player_one"belongs_to:player_two,:class_name=>"Player",:foreign_key=>"player_two"在tournaments_controller的显示操作中,我调用以下查询:Tournament.where(:id=>params

  9. ruby-on-rails - Sunspot:如何对具有不同值的多个字段进行全文查询? - 2

    我想用sunspot重现以下原始solr查询q=exact_term_text:fooORterm_textv:foo*ORalternate_text:bar*但我无法通过标准的太阳黑子界面理解这是否可能以及如何实现,因为看起来:fulltext方法似乎不接受多个文本/搜索字段参数我不知道将什么参数作为第一个参数传递给fulltext,就好像我通过了"foo"或"bar"结果不匹配如果我传递一个空参数,我得到一个q=*:*范围过滤器(例如with(:term).starting_with('foo*')(顾名思义)作为过滤器查询应用,因此不参与评分。似乎可以手动编写字符串(或者可能使

  10. ruby-on-rails - 在不重新查询数据库的情况下重新排序 Rails 中的事件记录? - 2

    例如,假设我有一个名为Products的模型,并且在ProductsController中,我有以下代码用于product_listView以显示已排序的产品。@products=Product.order(params[:order_by])让我们想象一下,在product_listView中,用户可以使用下拉菜单按价格、评级、重量等进行排序。数据库中的产品不会经常更改。我很难理解的是,每次用户选择新的order_by过滤器时,rails是否必须查询,或者rails是否能够以某种方式缓存事件记录以在服务器端重新排序?有没有一种方法可以编写它,以便在用户排序时rails不会重新查询结果

随机推荐