[ElasticSearch]Java API 之 滚动搜索(Scroll API)

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/SunnyYoona/article/details/52810397 一般搜索请求都是返回一"页"数据,无论数据量多大都一起返回给用户,Scroll API可以允许我们检索大量数据(甚至全部数据)。
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/SunnyYoona/article/details/52810397

一般搜索请求都是返回一"页"数据,无论数据量多大都一起返回给用户,Scroll API可以允许我们检索大量数据(甚至全部数据)。Scroll API允许我们做一个初始阶段搜索并且持续批量从Elasticsearch里拉取结果直到没有结果剩下。这有点像传统数据库里的cursors(游标)。

Scroll API的创建并不是为了实时的用户响应,而是为了处理大量的数据(Scrolling is not intended for real time user requests, but rather for processing large amounts of data)。从 scroll 请求返回的结果只是反映了 search 发生那一时刻的索引状态,就像一个快照(The results that are returned from a scroll request reflect the state of the index at the time that the initial search request was made, like a snapshot in time)。后续的对文档的改动(索引、更新或者删除)都只会影响后面的搜索请求。

1. 普通请求

假设我们想一次返回大量数据,下面代码中一次请求58000条数据:

 
 
  1.    /**
  2.     *  普通搜索
  3.     * @param client
  4.     */
  5.    public static void search(Client client) {
  6.        String index = "simple-index";
  7.        String type = "simple-type";
  8.        // 搜索条件
  9.        SearchRequestBuilder searchRequestBuilder = client.prepareSearch();
  10.        searchRequestBuilder.setIndices(index);
  11.        searchRequestBuilder.setTypes(type);
  12.        searchRequestBuilder.setSize(58000);
  13.        // 执行
  14.        SearchResponse searchResponse = searchRequestBuilder.get();
  15.        // 搜索结果
  16.        SearchHit[] searchHits = searchResponse.getHits().getHits();
  17.        for (SearchHit searchHit : searchHits) {
  18.            String source = searchHit.getSource().toString();
  19.            logger.info("--------- searchByScroll source {}", source);
  20.        } // for
  21.    }

运行结果:

 
 
  1. Caused by: QueryPhaseExecutionException[Result window is too large, from + size must be less than or equal to: [10000] but was [58000]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level parameter.]
  2. at org.elasticsearch.search.internal.DefaultSearchContext.preProcess(DefaultSearchContext.java:212)
  3. at org.elasticsearch.search.query.QueryPhase.preProcess(QueryPhase.java:103)
  4. at org.elasticsearch.search.SearchService.createContext(SearchService.java:676)
  5. at org.elasticsearch.search.SearchService.createAndPutContext(SearchService.java:620)
  6. at org.elasticsearch.search.SearchService.executeQueryPhase(SearchService.java:371)
  7. at org.elasticsearch.search.action.SearchServiceTransportAction$SearchQueryTransportHandler.messageReceived(SearchServiceTransportAction.java:368)
  8. at org.elasticsearch.search.action.SearchServiceTransportAction$SearchQueryTransportHandler.messageReceived(SearchServiceTransportAction.java:365)
  9. at org.elasticsearch.transport.TransportRequestHandler.messageReceived(TransportRequestHandler.java:33)
  10. at org.elasticsearch.transport.RequestHandlerRegistry.processMessageReceived(RequestHandlerRegistry.java:75)
  11. at org.elasticsearch.transport.TransportService$4.doRun(TransportService.java:376)
  12. at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
  13. ... 3 more

从上面我们可以知道,搜索请求一次请求最大量为[10000]。我们的请求量已经超标,因此报错,异常信息提示我们请求大数据量的情况下使用Scroll API。

2. 使用Scroll API 请求

为了使用 scroll,初始搜索请求应该在查询中指定 scroll 参数,告诉 Elasticsearch 需要保持搜索的上下文环境多长时间(滚动时间)。

 
 
  1. searchRequestBuilder.setScroll(new TimeValue(60000));

下面代码中指定了查询条件以及滚动属性,如滚动的有效时长(使用setScroll()方法)。我们通过SearchResponse对象的getScrollId()方法获取滚动ID。滚动ID会在下一次请求中使用。

 
 
  1.    /**
  2.     * 使用scroll进行搜索
  3.     * @param client
  4.     */
  5.    public static String searchByScroll(Client client) {
  6.        String index = "simple-index";
  7.        String type = "simple-type";
  8.        // 搜索条件
  9.        SearchRequestBuilder searchRequestBuilder = client.prepareSearch();
  10.        searchRequestBuilder.setIndices(index);
  11.        searchRequestBuilder.setTypes(type);
  12.        searchRequestBuilder.setScroll(new TimeValue(30000));
  13.        // 执行
  14.        SearchResponse searchResponse = searchRequestBuilder.get();
  15.        String scrollId = searchResponse.getScrollId();
  16.        logger.info("--------- searchByScroll scrollID {}", scrollId);
  17.        SearchHit[] searchHits = searchResponse.getHits().getHits();
  18.        for (SearchHit searchHit : searchHits) {
  19.            String source = searchHit.getSource().toString();
  20.            logger.info("--------- searchByScroll source {}", source);
  21.        } // for
  22.        return scrollId;
  23.        
  24.    }

使用上面的请求返回的结果中的滚动ID,这个 ID 可以传递给 scroll API 来检索下一个批次的结果。这一次请求中不用添加索引和类型,这些都指定在了原始的 search 请求中。

每次返回下一个批次结果 直到没有结果返回时停止 即hits数组空时(Each call to the scroll API returns the next batch of results until there are no more results left to return, ie the hits array is empty)。

 
 
  1.    /**
  2.     *  通过滚动ID获取文档
  3.     * @param client
  4.     * @param scrollId
  5.     */
  6.    public static void searchByScrollId(Client client, String scrollId){
  7.        TimeValue timeValue = new TimeValue(30000);
  8.        SearchScrollRequestBuilder searchScrollRequestBuilder;
  9.        SearchResponse response;
  10.        // 结果
  11.        while (true) {
  12.            logger.info("--------- searchByScroll scrollID {}", scrollId);
  13.            searchScrollRequestBuilder = client.prepareSearchScroll(scrollId);
  14.            // 重新设定滚动时间
  15.            searchScrollRequestBuilder.setScroll(timeValue);
  16.            // 请求
  17.            response = searchScrollRequestBuilder.get();
  18.            // 每次返回下一个批次结果 直到没有结果返回时停止 即hits数组空时
  19.            if (response.getHits().getHits().length == 0) {
  20.                break;
  21.            } // if
  22.            // 这一批次结果
  23.            SearchHit[] searchHits = response.getHits().getHits();
  24.            for (SearchHit searchHit : searchHits) {
  25.                String source = searchHit.getSource().toString();
  26.                logger.info("--------- searchByScroll source {}", source);
  27.            } // for
  28.            // 只有最近的滚动ID才能被使用
  29.            scrollId = response.getScrollId();
  30.        } // while
  31.    }

备注:

初始搜索请求和每个后续滚动请求返回一个新的 滚动ID——只有最近的滚动ID才能被使用。(The initial search request and each subsequent scroll request returns a new_scroll_id — only the most recent _scroll_id should be used)  

我每次后续滚动请求返回的滚动ID都是相同的,所以对上面的备注,不是很懂,有明白的可以告知,谢谢。


如果超过滚动时间,继续使用该滚动ID搜索数据,则会报错:

 
 
  1. Caused by: SearchContextMissingException[No search context found for id [2861]]
  2. at org.elasticsearch.search.SearchService.findContext(SearchService.java:613)
  3. at org.elasticsearch.search.SearchService.executeQueryPhase(SearchService.java:403)
  4. at org.elasticsearch.search.action.SearchServiceTransportAction$SearchQueryScrollTransportHandler.messageReceived(SearchServiceTransportAction.java:384)
  5. at org.elasticsearch.search.action.SearchServiceTransportAction$SearchQueryScrollTransportHandler.messageReceived(SearchServiceTransportAction.java:381)
  6. at org.elasticsearch.transport.TransportRequestHandler.messageReceived(TransportRequestHandler.java:33)
  7. at org.elasticsearch.transport.RequestHandlerRegistry.processMessageReceived(RequestHandlerRegistry.java:75)
  8. at org.elasticsearch.transport.TransportService$4.doRun(TransportService.java:376)
  9. at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
  10. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  11. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  12. at java.lang.Thread.run(Thread.java:745)


3. 清除滚动ID

虽然当滚动有效时间已过,搜索上下文(Search Context)会自动被清除,但是一值保持滚动代价也是很大的,所以当我们不在使用滚动时要尽快使用Clear-Scroll API进行清除。

 
  
  1. /**
  2. * 清除滚动ID
  3. * @param client
  4. * @param scrollIdList
  5. * @return
  6. */
  7. public static boolean clearScroll(Client client, List<String> scrollIdList){
  8. ClearScrollRequestBuilder clearScrollRequestBuilder = client.prepareClearScroll();
  9. clearScrollRequestBuilder.setScrollIds(scrollIdList);
  10. ClearScrollResponse response = clearScrollRequestBuilder.get();
  11. return response.isSucceeded();
  12. }
  13. /**
  14. * 清除滚动ID
  15. * @param client
  16. * @param scrollId
  17. * @return
  18. */
  19. public static boolean clearScroll(Client client, String scrollId){
  20. ClearScrollRequestBuilder clearScrollRequestBuilder = client.prepareClearScroll();
  21. clearScrollRequestBuilder.addScrollId(scrollId);
  22. ClearScrollResponse response = clearScrollRequestBuilder.get();
  23. return response.isSucceeded();
  24. }



4. 参考:

https://www.elastic.co/guide/en/elasticsearch/reference/2.4/search-request-scroll.html

http://www.jianshu.com/p/14aa8b09c789

5. 说明

本代码基于ElasticSearch 2.4.1 



相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
目录
相关文章
|
17天前
|
Java 关系型数据库 MySQL
Elasticsearch【问题记录 01】启动服务&停止服务的2类方法【及 java.nio.file.AccessDeniedException: xx/pid 问题解决】(含shell脚本文件)
【4月更文挑战第12天】Elasticsearch【问题记录 01】启动服务&停止服务的2类方法【及 java.nio.file.AccessDeniedException: xx/pid 问题解决】(含shell脚本文件)
70 3
|
9天前
|
关系型数据库 MySQL Java
实时计算 Flink版操作报错之遇到java.lang.IllegalStateException: The elasticsearch emitter must be serializable.的错误,如何处理
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4天前
|
存储 Java 数据库连接
从 0 实现一个文件搜索工具 (Java 项目)
从 0 实现一个文件搜索工具 (Java 项目)
46 17
|
9天前
|
Prometheus 监控 Cloud Native
实时计算 Flink版操作报错之在使用ES时遇到“java.lang.IllegalStateException: The elasticsearch emitter must be serializable”,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
17天前
|
存储 自然语言处理 搜索推荐
分布式搜索引擎ElasticSearch
Elasticsearch是一款强大的开源搜索引擎,用于快速搜索和数据分析。它在GitHub、电商搜索、百度搜索等场景中广泛应用。Elasticsearch是ELK(Elasticsearch、Logstash、Kibana)技术栈的核心,用于存储、搜索和分析数据。它基于Apache Lucene构建,提供分布式搜索能力。相比其他搜索引擎,如Solr,Elasticsearch更受欢迎。倒排索引是其高效搜索的关键,通过将词条与文档ID关联,实现快速模糊搜索,避免全表扫描。
83 3
|
17天前
|
Java API
Java操作elasticsearch
Java操作elasticsearch
18 0
|
17天前
|
SQL 监控 搜索推荐
Elasticsearch 与 OpenSearch:开源搜索技术的演进与选择
Elasticsearch 与 OpenSearch:开源搜索技术的演进与选择
51 2
|
17天前
|
人工智能 程序员 开发者
Elasticsearch 中文社区的转型后,搜索人怎么破局?
Elasticsearch 中文社区的转型后,搜索人怎么破局?
18 0
|
17天前
|
存储 自然语言处理 搜索推荐
Elasticsearch 8.10 同义词管理新篇章:引入同义词 API
Elasticsearch 8.10 同义词管理新篇章:引入同义词 API
29 0
|
17天前
|
存储 机器学习/深度学习 API
高维向量搜索:在 Elasticsearch 8.X 中利用 dense_vector 的实战探索
高维向量搜索:在 Elasticsearch 8.X 中利用 dense_vector 的实战探索
42 0
高维向量搜索:在 Elasticsearch 8.X 中利用 dense_vector 的实战探索