草庐IT

RestHighLevelClient 操作ElasticSearch

dkjhl 2023-04-10 原文

1、jar包

<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>7.8</version>
</dependency>

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>7.8</version>
</dependency>

2、kerberos认证

public void kerberos() {
    try {
        System.setProperty("http.auth.preference", "Kerberos");
        System.setProperty("java.security.krb5.conf", KRB5CONF);
        System.setProperty("sun.security.krb5.debug", "false");
        System.setProperty("sun.security.spnego.debug", "false");
        String acceptorPrincipal = PRINCIPAL;

        Path acceptorKeyTabPath = Paths.get(KEYTAB);
        Set<String> set = new HashSet<>();
        set.add(acceptorPrincipal);
        final Subject subject = JaasKrbUtil.loginUsingKeytab(set, acceptorKeyTabPath, true); // 工具类未提供,需要单联
        Set<Object> privateCredentials = subject.getPrivateCredentials();

        log.info("getPrivateCredentials ------------------- ");
        privateCredentials.forEach(System.out::println);
    } catch (Exception e) {
        e.printStackTrace();
    }
}
public void kerberosRest(String index) {
    RestHighLevelClient restHighLevelClient = null;
    String indexName = "cool_test_one";
    String typeName = "cool_test_one_table";
    try {
        log.info("kerberos 访问 start =======================");
        log.info("开始认证");
        SpnegoHttpClientConfigCallbackHandler callbackHandler = new SpnegoHttpClientConfigCallbackHandler(PRINCIPAL, KEYTAB, true); // 工具类未提供,需要单联
        log.info("认证成功 ===================================");

        // 业务逻辑开始
        List<HttpHost> hosts = new ArrayList<>();
        HttpHost hostNew = new HttpHost("localhost", 9200, "http");
        hosts.add(hostNew);
        HttpHost[] httpHosts = hosts.toArray(new HttpHost[0]);

        RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);

        /** options start **/

        
        // 异步连接延时配置 https://www.elastic.co/guide/en/elasticsearch/client/java-rest/7.12/_timeouts.html
        // RequestConfig有三个超时如下
        int connectTimeout = 5000; // 设置连接超时时间,单位毫秒。指的是连接一个url的连接等待时间
        int socketTimeout = 5000; // 请求获取数据的超时时间,单位毫秒。 如果访问一个接口,多少时间内无法返回数据,就直接放弃此次调用。指的是连接上一个url,获取response的返回等待时间。
        int connectionRequestTimeout = 5000; // 设置从connect Manager获取Connection 超时时间,单位毫秒。这个属性是新加的属性,因为目前版本是可以共享连接池的。
        //配置请求超时超时,分为 连接超时(默认1s) 和 套接字超时(默认30s)
        restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> {
            requestConfigBuilder.setConnectTimeout(connectTimeout);//配置连接超时时间
            requestConfigBuilder.setSocketTimeout(socketTimeout);//配置套接字超时时间
            requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeout);//获取连接的超时时间
            return requestConfigBuilder;
        });

        // 异步连接数配置
        int maxConnectNum = 100; // 最大连接数
        int maxConnectPerRoute = 100; // 最大路由连接数
        restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
            httpClientBuilder.setMaxConnTotal(maxConnectNum);
            httpClientBuilder.setMaxConnPerRoute(maxConnectPerRoute);
            return httpClientBuilder;
        });

        /** options end **/

        restHighLevelClient = new RestHighLevelClient(restClientBuilder);

        log.info("operation start =========================================");

        singleIndex(restHighLevelClient, indexName, typeName); // 单条文档写入
        bulkIndex(restHighLevelClient, indexName, typeName); // 批量文档写入

        singleUpdate(restHighLevelClient, indexName, typeName); // 单条文档更新
        bulkUpdate(restHighLevelClient, indexName, typeName); // 批量文档更新

        singleUpsert(restHighLevelClient, indexName, typeName); // 单条文档upsert
        bulkUpsert(restHighLevelClient, indexName, typeName); // 批量文档upsert

        bulkUpdateNoDocId(restHighLevelClient, indexName, typeName);
        bulkUpsertNoDocId(restHighLevelClient, indexName, typeName);

        singleDelete(restHighLevelClient, indexName, typeName);
        log.info("operation end ===========================================");

        // 测试获取所有的索引
        log.info("获取" + indexName + " 索引数据");
        getIndex(restHighLevelClient, indexName);
        log.info("kerberos 访问 end");

    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        if (restHighLevelClient != null) {
            try {
                restHighLevelClient.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

3、写入操作

1)根据_id单条文档写入

单条文档写入需要创建IndexRequest对象,设置索引名称,类型名称,id名称,以及使用source传入文档字段的Map对象,在执行写入时使用客户端的index方法把IndexRequest传入即可

public String singleIndex(RestHighLevelClient restHighLevelClient, String indexName, String typeName) throws IOException {
    XContentBuilder builder = XContentFactory.jsonBuilder();
    builder.startObject();
    builder.field("a1", "bb");
    builder.field("a2", 100);
    builder.field("a3", 10.12);
    builder.field("a4", "2055-05-05");
    builder.field("a5", "20:55:55");
    builder.field("a6", "2055-05-05");
    builder.endObject();

    String docIdStr = Md5Util.encode("bb");
    log.info("doc_id ===>" + docIdStr);

    IndexRequest request = new IndexRequest(indexName, typeName, docIdStr).source(builder);
    IndexResponse response = null;

    try {
        // 不使用默认的RequestOptions.DEFAULT,而通过使用自定义RequestOptions的方式(ES官方api已经给我们开放出来了):
        RequestOptions.Builder buildersize = RequestOptions.DEFAULT.toBuilder();
        buildersize.setHttpAsyncResponseConsumerFactory(
                new HttpAsyncResponseConsumerFactory
                        //修改为500MB
                        .HeapBufferedResponseConsumerFactory(500 * 1024 * 1024));
        //             response = restHighLevelClient.index(request, RequestOptions.DEAFULT);


        response = restHighLevelClient.index(request, buildersize.build());
    } catch (IOException e) {
        e.printStackTrace();
    }
    return response.getId();
}

2)批量写入文档

批量写入需要创建BulkRequest,多条数据每条创建一个IndexRequest对象设置index,type,id和数据Map,将这些IndexRequest对象条件到BulkRequest中,调用客户端的bulk方法执行即可

public static void bulkIndex(RestHighLevelClient restHighLevelClient, String indexName, String typeName) throws IOException {
    BulkRequest bulkRequest = new BulkRequest();

    Map<String, Object> data = new HashMap<>();
    Object colData = "kugou";
    data.put("a1", String.valueOf(colData));
    data.put("a2", 100);
    data.put("a3", 10.12);
    data.put("a4", "2055-05-05");
    data.put("a5", "20:55:55");
    data.put("a6", "2055-05-05");

    for (int i = 0; i < 2; i++) {
        String docIdStr = Md5Util.encode("bb" + i);
        log.info("doc_id =>" + docIdStr);

        // 有doc_id
        IndexRequest indexRequest = new IndexRequest(indexName, typeName, docIdStr).source(data);
        // 无doc_id
//            IndexRequest indexRequest1 = new IndexRequest(indexName, typeName, docIdStr).source(data);

        bulkRequest.add(indexRequest);
    }

    BulkResponse response = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
    log.info("bulk write result is " + !response.hasFailures());
}

 3)更新单条文档

更新单条文档需要创建UpdateRequest对象,设置index,type,id和文档字段数据,调用客户端的update方法执行即可

public static void singleUpdate(RestHighLevelClient restHighLevelClient, String indexName, String typeName) throws IOException {

        Map<String, Object> data = new HashMap<>();
        Object colData = "kugou";
        data.put("a1", String.valueOf(colData));
        data.put("a2", 100);
        data.put("a3", 10.12);
        data.put("a4", "2055-05-05");
        data.put("a5", "20:55:55");
        data.put("a6", "2055-05-05");

        String docIdStr = Md5Util.encode("kugou");

        UpdateRequest updateRequest = new UpdateRequest(indexName, typeName, docIdStr);
        UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);

        DocWriteResponse.Result result = updateResponse.getResult();
}

4)批量文档更新

public static void bulkUpdate(RestHighLevelClient restHighLevelClient, String indexName, String typeName) throws IOException {
    BulkRequest bulkRequest = new BulkRequest();

    Map<String, Object> data = new HashMap<>();
    Object colData = "test";
    data.put("a1", String.valueOf(colData));
    data.put("a2", 100);
    data.put("a3", 10.12);
    data.put("a4", "2055-05-05");
    data.put("a5", "20:55:55");
    data.put("a6", "2055-05-05");

    for (int i = 0; i < 2; i++) {
        String docIdStr = Md5Util.encode("bb" + i);
        log.info("doc_id =>" + docIdStr);

        UpdateRequest updateRequest = new UpdateRequest(indexName, typeName, docIdStr);
        updateRequest.doc(data);
        bulkRequest.add(updateRequest);
    }

    BulkResponse response = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
    log.info("bulk write result is " + !response.hasFailures());
}

5)upsert方式

对于有则更新无则插入的情况,UpdateRequest在设置doc之后再设置以下upsert即可,其他一样

public static void singleUpsert(RestHighLevelClient restHighLevelClient, String indexName, String typeName) throws IOException {
    // 对于有则更新无则插入的情况,UpdateRequest在设置doc之后再设置以下upsert即可,其他一样
    Map<String, Object> data = new HashMap<>();
    Object colData = "kugou";
    data.put("a1", String.valueOf(colData));
    data.put("a2", 100);
    data.put("a3", 10.12);
    data.put("a4", "2055-05-05");
    data.put("a5", "20:55:55");
    data.put("a6", "2055-05-05");

    /**
     * 测试过程中,如果es定义的字段数为6个;先index的时候,字段数为5个;upsert的时候,字段数6个,结果是success
     */

    String docIdStr = Md5Util.encode("kugou");

    UpdateRequest updateRequest = new UpdateRequest(indexName, typeName, docIdStr);
    updateRequest.doc(data).upsert(data);
    UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
}

public static void bulkUpsert(RestHighLevelClient restHighLevelClient, String indexName, String typeName) throws IOException {
    BulkRequest bulkRequest = new BulkRequest();

    Map<String, Object> data = new HashMap<>();
    Object colData = "test";
    data.put("a1", String.valueOf(colData));
    data.put("a2", 100);
    data.put("a3", 10.12);
    data.put("a4", "2055-05-05");
    data.put("a5", "20:55:55");
    data.put("a6", "2055-05-05");

    for (int i = 0; i < 2; i++) {
        String docIdStr = Md5Util.encode("bb" + i);
        log.info("doc_id =>" + docIdStr);

        UpdateRequest updateRequest = new UpdateRequest(indexName, typeName, docIdStr);
        updateRequest.doc(data).upsert(data); // 对于有则更新无则插入的情况,UpdateRequest在设置doc之后再设置以下upsert即可,其他一样
        bulkRequest.add(updateRequest);
    }

    BulkResponse response = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
    log.info("bulk write result is " + !response.hasFailures());
}

6)多种操作类型混合操作

public static void bulkMixOperation(RestHighLevelClient restHighLevelClient, String indexName, String typeName) throws IOException {
    Map<String, Object> data = new HashMap<>();
    Object colData = "index_id";
    data.put("a1", String.valueOf(colData));
    data.put("a2", 100);
    data.put("a3", 10.12);
    data.put("a4", "2055-05-05");
    data.put("a5", "20:55:55");
    data.put("a6", "2055-05-05");
    String index_doc_id = Md5Util.encode(String.valueOf(colData));

    Map<String, Object> data1 = new HashMap<>();
    Object colData1 = "upsert_id";
    data1.put("a1", String.valueOf(colData1));
    data1.put("a2", 100);
    data1.put("a3", 10.12);
    data1.put("a4", "2055-05-05");
    data1.put("a5", "20:55:55");
    data1.put("a6", "2055-05-05");
    String upsert_doc_id = Md5Util.encode(String.valueOf(colData1));

    Map<String, Object> data2 = new HashMap<>();
    Object colData2 = "update_id";
    data2.put("a1", String.valueOf(colData1));
    data2.put("a2", 100);
    data2.put("a3", 10.12);
    data2.put("a4", "2055-05-05");
    data2.put("a5", "20:55:55");
    data2.put("a6", "2055-05-05");
    String update_doc_id = Md5Util.encode(String.valueOf(colData2));

    BulkRequest bulkRequest = new BulkRequest();
    IndexRequest indexRequest = new IndexRequest(indexName, typeName, index_doc_id).source(data);
    UpdateRequest updateRequest = new UpdateRequest(indexName, typeName, upsert_doc_id).doc(data1).upsert(data1);
    UpdateRequest updateRequest1 = new UpdateRequest(indexName, typeName, update_doc_id).doc(data2);

    bulkRequest.add(indexRequest);
    bulkRequest.add(updateRequest);
    bulkRequest.add(updateRequest1);

    BulkResponse result = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);

    log.info("mix bulk result is " + !result.hasFailures());

}

7)listMap构造数据插入ES

public static void ListMapIndex(RestHighLevelClient restHighLevelClient, String indexName, String typeName) throws IOException {
        List<Map<String, Object>> list = new ArrayList<>();
        list.add(new HashMap<String, Object>() {{
            put("id", "003");
            put("name", "c宾馆");
            put("city", "苏州");
            put("price", 7.54);
        }});
        list.add(new HashMap<String, Object>() {{
            put("id", "004");
            put("name", "d宾馆");
            put("city", "杭州");
            put("price", 17.34);
        }});
        list.add(new HashMap<String, Object>() {{
            put("id", "005");
            put("name", "e宾馆");
            put("city", "上海");
            put("price", 21.92);
        }});

        BulkRequest bulkRequest = new BulkRequest();
        list.forEach(s -> bulkRequest.add(new IndexRequest().index(indexName).type(typeName).id(s.get("id").toString()).source(s)));
        bulkRequest.timeout(TimeValue.timeValueSeconds(5));
        BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);

        log.info("result is " + !bulkResponse.hasFailures());
        restHighLevelClient.close();
    }

8)根据条件批量更新文档

创建UpdateByQueryRequest对象,分别设置setQuerysetScript,分别代表条件和更新语句,最后客户端调用updateByQuery更新

public static void updateByQuery(RestHighLevelClient restHighLevelClient, String indexName, String typeName) throws IOException {
        UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(indexName, typeName);
        updateByQueryRequest.setQuery(new TermQueryBuilder("city", "上海"));
        updateByQueryRequest.setScript(new Script("ctx._source['city']='杭州'"));
        restHighLevelClient.updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT);
        restHighLevelClient.close();
    }

9)删除单条文档

删除单条文档使用DeleteRequest对象,传入index,type,doc_id,客户端调用delete方法即可

public static void singleDelete(RestHighLevelClient restHighLevelClient, String indexName, String typeName) {
        DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest();
        deleteByQueryRequest.indices(indexName);
        deleteByQueryRequest.setQuery(new MatchQueryBuilder("a1", "hello"));
        try {
            restHighLevelClient.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

10)批量删除文档

批量删除使用BulkRequest对象,将DeleteRequest传入BulkRequest中,最后调用客户端的bulk提交即可

public static void bulkDelete(RestHighLevelClient restHighLevelClient, String indexName, String typeName) throws IOException {
        BulkRequest bulkRequest = new BulkRequest();
        bulkRequest.add(new DeleteRequest("hotel", "_doc", "004"));
        bulkRequest.add(new DeleteRequest("hotel", "_doc", "003"));
        restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);

        restHighLevelClient.close();
    }

11)根据条件删除文档

使用deleteByQueryRequest对象,设置setQuery参数为条件,客户端调用deleteByQuery即可

    public static void deleteByQuery(RestHighLevelClient restHighLevelClient, String indexName, String typeName) throws IOException {
        DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indexName);
        deleteByQueryRequest.setQuery(new BoolQueryBuilder().mustNot(new TermQueryBuilder("city", "上海")));
        restHighLevelClient.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);

        restHighLevelClient.close();
    }

12)搜索文档操作

/**
     *
     * @param restHighLevelClient
     * @param indexName
     * @param typeName
     * @throws IOException
     */
    public static void searchIndex(RestHighLevelClient restHighLevelClient, String indexName, String typeName) throws IOException {
        SearchRequest searchRequest = new SearchRequest(indexName, typeName);
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.query(QueryBuilders.matchQuery("city", "上海"));
        searchRequest.source(sourceBuilder);
        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        RestStatus restStatus = searchResponse.status();
        System.out.println(restStatus);
        if (restStatus == RestStatus.OK) {
            SearchHits searchHits = searchResponse.getHits();
            for (SearchHit searchHit : searchHits) {
                System.out.println("id:" + searchHit.getId());
                System.out.println("index:" + searchHit.getIndex());
                System.out.println("score:" + searchHit.getScore());
                Map<String, Object> map = searchHit.getSourceAsMap();
                System.out.println("name:" + (String) map.get("name"));
                System.out.println("city:" + (String) map.get("city"));
                System.out.println("price:" + (Double) map.get("price"));
            }
        }
        restHighLevelClient.close();
    }

    /**
     * term精确搜索
     *
     * @param restHighLevelClient
     * @param indexName
     * @param typeName
     * @throws IOException
     */
    public static void termSearch(RestHighLevelClient restHighLevelClient, String indexName, String typeName) throws IOException {
        SearchRequest searchRequest = new SearchRequest(indexName, typeName);
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(QueryBuilders.termQuery("_id", "007"));
        searchRequest.source(searchSourceBuilder);

        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        if (searchResponse.status() == RestStatus.OK) {
            SearchHits searchHits = searchResponse.getHits();
            Object city = searchHits.getAt(0).getSourceAsMap().get("city");
            System.out.println(String.valueOf(city));
        }

        restHighLevelClient.close();
    }

    /**
     * range范围搜索
     *
     * @param restHighLevelClient
     * @param indexName
     * @param typeName
     * @throws IOException
     */
    public static void rangeSearch(RestHighLevelClient restHighLevelClient, String indexName, String typeName) throws IOException {
        SearchRequest searchRequest = new SearchRequest(indexName, typeName);
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(QueryBuilders.rangeQuery("price").gte(10.0).lte(100.0));
        searchRequest.source(searchSourceBuilder);
        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        if (searchResponse.status() == RestStatus.OK) {
            SearchHits searchHits = searchResponse.getHits();
            for (SearchHit searchHit : searchHits) {
                System.out.println(searchHit.getSourceAsMap());
            }
        }

        restHighLevelClient.close();
    }

    /**
     * 分页查询
     *
     * @param restHighLevelClient
     * @param indexName
     * @param typeName
     * @throws IOException
     */
    public static void pageSearch(RestHighLevelClient restHighLevelClient, String indexName, String typeName) throws IOException {
        SearchRequest searchRequest = new SearchRequest(indexName, typeName);
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().from(1).size(1).query(QueryBuilders.rangeQuery("price").gte(10.0).lte(100.0));
        searchRequest.source(searchSourceBuilder);
        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        if (searchResponse.status() == RestStatus.OK) {
            SearchHits searchHits = searchResponse.getHits();
            for (SearchHit searchHit : searchHits) {
                System.out.println(searchHit.getSourceAsMap());
            }
        }
        restHighLevelClient.close();
    }

    /**
     * 这种常用于根据筛选条件之后抽取全部数据的场景,
     * scroll API 可以被用来检索大量的结果, 甚至所有的结果 ,
     * 注意es的游标查询的是当下时刻的数据快照,
     * 即在游标查询之后的数据的变动不会影响游标查询的结果,
     * 默认游标查询根据_doc字段进行排序
     *
     * @param restHighLevelClient
     * @param indexName
     * @param typeName
     * @throws IOException
     */
    public static void cusorSearch(RestHighLevelClient restHighLevelClient, String indexName, String typeName) throws IOException {
        SearchRequest searchRequest = new SearchRequest(indexName, typeName);
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(QueryBuilders.rangeQuery("price").gte(5.0).lte(100.0));
        searchSourceBuilder.size(2);
        searchRequest.source(searchSourceBuilder);
        Scroll scroll = new Scroll(timeValueMillis(1L));
        /**
         *
         * 在搜索条件之后使用searchSourceBuilder.size(2)设置了每次游标只抽取2条数据,
         * 设置每次游标的超时时间是1毫秒timeValueMillis,可以适当调高超时时间防止由于超时还没查完导致游标提前结束。
         * 在执行游标的时候,第一次使用了客户端的search方法,从第二次开始使用scroll方法,
         * 每开始下一次游标的时候都通过查看本次游标的结果是否为空searchResponse.getHits().getHits()来判断是否还要继续,
         * 把每次游标的返回结果收集起来拿到全部数据
         *
         */
        searchRequest.scroll(scroll);
        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        String scrollId = searchResponse.getScrollId();
        SearchHit[] hits = searchResponse.getHits().getHits();
        List<SearchHit> resultSearchHit = new ArrayList<>();
        while (hits != null && hits.length > 0) {
            System.out.println(hits.length);
            System.out.println(scrollId);
            resultSearchHit.addAll(Arrays.asList(hits));
            SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId);
            searchScrollRequest.scroll(scroll);
            SearchResponse searchScrollResponse = restHighLevelClient.scroll(searchScrollRequest, RequestOptions.DEFAULT);
            scrollId = searchScrollResponse.getScrollId();
            hits = searchScrollResponse.getHits().getHits();
        }
        ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
        clearScrollRequest.addScrollId(scrollId);
        restHighLevelClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
        restHighLevelClient.close();
    }

    /**
     * 返回指定字段
     * 设置fetchSource参数,传入两个数组,前一个是包含的字段,后一个是排除的字段
     *
     * @param restHighLevelClient
     * @param indexName
     * @param typeName
     * @throws IOException
     */
    public static void specColSearch(RestHighLevelClient restHighLevelClient, String indexName, String typeName) throws IOException {
        SearchRequest searchRequest = new SearchRequest(indexName, typeName);
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
                .query(QueryBuilders.termQuery("_id", "001"))
                .fetchSource(new String[]{"city"}, new String[]{});
        searchRequest.source(searchSourceBuilder);
        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        if (searchResponse.status() == RestStatus.OK) {
            SearchHits searchHits = searchResponse.getHits();
            System.out.println(searchHits.getAt(0));
        }

        restHighLevelClient.close();
    }

    /**
     * 排序
     * 排序在SearchSourceBuilder对象后构建sort参数,通过SortOrder.DESC倒序列和SortOrder.ASC升序
     * @param restHighLevelClient
     * @param indexName
     * @param typeName
     * @throws IOException
     */
    public static void sortSearch(RestHighLevelClient restHighLevelClient, String indexName, String typeName) throws IOException {
        SearchRequest searchRequest = new SearchRequest(indexName, typeName);
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().sort("price", SortOrder.DESC);
        searchRequest.source(searchSourceBuilder);
        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        if (searchResponse.status() == RestStatus.OK) {
            SearchHits searchHits = searchResponse.getHits();
            for (SearchHit searchHit : searchHits) {
                System.out.println(searchHit);
            }
        }

        restHighLevelClient.close();
    }

有关RestHighLevelClient 操作ElasticSearch的更多相关文章

  1. ruby - 如何使用 Selenium Webdriver 根据 div 的内容执行操作? - 2

    我有一个使用SeleniumWebdriver和Nokogiri的Ruby应用程序。我想选择一个类,然后对于那个类对应的每个div,我想根据div的内容执行一个Action。例如,我正在解析以下页面:https://www.google.com/webhp?sourceid=chrome-instant&ion=1&espv=2&ie=UTF-8#q=puppies这是一个搜索结果页面,我正在寻找描述中包含“Adoption”一词的第一个结果。因此机器人应该寻找带有className:"result"的div,对于每个检查它的.descriptiondiv是否包含单词“adoption

  2. ruby-on-rails - 如何处理 Grape 中特定操作的过滤器之前? - 2

    我正在我的Rails项目中安装Grape以构建RESTfulAPI。现在一些端点的操作需要身份验证,而另一些则不需要身份验证。例如,我有users端点,看起来像这样:moduleBackendmoduleV1classUsers现在如您所见,除了password/forget之外的所有操作都需要用户登录/验证。创建一个新的端点也没有意义,比如passwords并且只是删除password/forget从逻辑上讲,这个端点应该与用户资源。问题是Grapebefore过滤器没有像except,only这样的选项,我可以在其中说对某些操作应用过滤器。您通常如何干净利落地处理这种情况?

  3. ruby-on-rails - 在 Ruby on Rails 中发送响应之前如何等待多个异步操作完成? - 2

    在我做的一些网络开发中,我有多个操作开始,比如对外部API的GET请求,我希望它们同时开始,因为一个不依赖另一个的结果。我希望事情能够在后台运行。我找到了concurrent-rubylibrary这似乎运作良好。通过将其混合到您创建的类中,该类的方法具有在后台线程上运行的异步版本。这导致我编写如下代码,其中FirstAsyncWorker和SecondAsyncWorker是我编写的类,我在其中混合了Concurrent::Async模块,并编写了一个名为“work”的方法来发送HTTP请求:defindexop1_result=FirstAsyncWorker.new.async.

  4. ruby - 在 Ruby 中是否有一种惯用的方法来操作 2 个数组? - 2

    a=[3,4,7,8,3]b=[5,3,6,8,3]假设数组长度相同,是否有办法使用each或其他一些惯用方法从两个数组的每个元素中获取结果?不使用计数器?例如获取每个元素的乘积:[15,12,42,64,9](0..a.count-1).eachdo|i|太丑了...ruby1.9.3 最佳答案 使用Array.zip怎么样?:>>a=[3,4,7,8,3]=>[3,4,7,8,3]>>b=[5,3,6,8,3]=>[5,3,6,8,3]>>c=[]=>[]>>a.zip(b)do|i,j|c[[3,5],[4,3],[7,6],

  5. ruby-on-rails - 如何让 Rails View 返回其关联的操作名称? - 2

    我有一个非常简单的Controller来管理我的Rails应用程序中的静态页面:classPagesController我怎样才能让View模板返回它自己的名字,这样我就可以做这样的事情:#pricing.html.erb#-->"Pricing"感谢您的帮助。 最佳答案 4.3RoutingParametersTheparamshashwillalwayscontainthe:controllerand:actionkeys,butyoushouldusethemethodscontroller_nameandaction_nam

  6. ruby - Rails Elasticsearch 聚合 - 2

    不知何故,我似乎无法获得包含我的聚合的响应...使用curl它按预期工作:HBZUMB01$curl-XPOST"http://localhost:9200/contents/_search"-d'{"size":0,"aggs":{"sport_count":{"value_count":{"field":"dwid"}}}}'我收到回复:{"took":4,"timed_out":false,"_shards":{"total":5,"successful":5,"failed":0},"hits":{"total":90,"max_score":0.0,"hits":[]},"a

  7. elasticsearch源码关于TransportSearchAction【阶段三】 - 2

    1.回顾.TransportServicepublicclassTransportServiceextendsAbstractLifecycleComponentTransportService:方法:1publicfinalTextendsTransportResponse>voidsendRequest(finalTransport.Connectionconnection,finalStringaction,finalTransportRequestrequest,finalTransportRequestOptionsoptions,TransportResponseHandlerT>

  8. Postman测试简单操作 - 2

    1、接口请求基本操作1.1例子tips在view的选项可以zoomin调整窗口字帖大小。1、创建一个测试的workspace,并命名为test2、test后面新增一个addrequest3、选择发送GET,URL为一个开源的https://api.apiopen.top/api/sentences获取每日一句4、点击send查看内容Tips:如果提示出现Error:tunnelingsocketcouldnotbeestablished,statusCode=407错误,参照以下解决办法)关于tunnelingsocketcouldnotbeestablished,cause=getaddri

  9. 【Linux操作系统】——网络配置与SSH远程 - 2

    Linux操作系统——网络配置与SSH远程安装完VMware与系统后,需要进行网络配置。第一个目标为进行SSH连接,可以从本机到VMware进行文件传送,首先需要进行网络配置。1.下载远程软件首先需要先下载安装一款远程软件:FinalShell或者xhell7FinalShellxhell7FinalShell下载:Windows下载http://www.hostbuf.com/downloads/finalshell_install.exemacOS下载http://www.hostbuf.com/downloads/finalshell_install.pkg2.配置CentOS网络安装好

  10. ruby - Ruby 语言可以用来构建操作系统吗? - 2

    Ruby语言是否可以用于创建全新的移动操作系统或桌面操作系统,即是否可以用于系统编程? 最佳答案 嗯,现在有一些操作系统使用比C更高级的语言。基本上,ruby解释器本身需要用一些低级的东西来编写,并且需要一些引导加载代码将功能齐全的ruby​​解释器作为独立内核加载到内存中。一旦ruby​​解释器被引导并以内核模式(或innerrings之一)运行,就没有什么可以阻止您在其上构建整个操作系统。不幸的是,它可能会很慢。每个操作系统功能的垃圾收集可能会相当引人注目。ruby解释器将负责任务调度和网络堆栈等基本事情,使用垃圾收集框架会大大

随机推荐