草庐IT

Elasticsearch:在 Java 应用中创建 mappings,批量写入及更新 - Java client 8.x

Elastic 中国社区官方博客 2023-04-13 原文

在我之前的文章 “Elasticsearch:使用最新的 Elasticsearch Java client 8.0 来创建索引并搜索”,我详细地描述了如何在 Java 客户端应用中创建一个索引并对它进行搜索。在那个例子里,我们并没有描述如何创建 mappings。最近,我看到有开发者在评论区里留言想知道如何创建 mappings 并使用 _bulk 来进行批量写入及更新。今天的文章,我是继先前的文章 “Elasticsearch:使用 Elasticsearch Java client 8.0 来连接带有 HTTPS 的集群” 来进行的。在 Elastic Stack 8.x 平台,开始引入 HTTPS 的访问,所以前面的那篇文章是最好的开始。

我将使用 Elastic Stack 8.2 老进行展示。为了方便大家的学习,你可以在地址 GitHub - liu-xiao-guo/ElasticsearchJava-mapping-bulk8 下载下面所讲内容的源码。关于 Java client API 的查找,你可以参考链接 Overview (java-client 8.2.2 API) 

如何在 Java 应用中创建 mappings 及进行批量写入

如何在 Java 应用中创建 mappings 及进行批量写入_哔哩哔哩_bilibili

创建 mappings

其实在最新的 Java 客户端 API 中,它的使用非常像极了我们常见的在 console 中的命令。比如,我们可以使用如下的命令来创建一个索引:

PUT test
{
  "mappings": {
    "properties": {
      "id": {
        "type": "keyword"
      },
      "name": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      },
      "price": {
        "type": "long"
      }
    }
  }
}

显然在上面,我们可以看见有一个叫做 mappings 的字段,当然它是操作与一个索引 test 上的。很自然的,在使用请求的时候,我们需要创建 mappings 这个定义。

首先,我们来把代码直接贴出来:

        ElasticsearchIndicesClient indices = client.indices();


        // Firstly remove "products" if it exists
        try {
            DeleteIndexRequest delete_request = new DeleteIndexRequest.Builder()
                    .index("products")
                    .build();
            DeleteIndexResponse delete_response = indices.delete(delete_request);
            System.out.println(delete_response.acknowledged());

        } catch (Exception e) {
            // e.printStackTrace();
        }

        // Secondly remove "test" if it exists
        try {
            DeleteIndexRequest delete_request = new DeleteIndexRequest.Builder()
                    .index("test")
                    .build();
            DeleteIndexResponse delete_response = indices.delete(delete_request);
            System.out.println(delete_response.acknowledged());

        } catch (Exception e) {
            // e.printStackTrace();
        }

在上面,它们相当于如下的指令:

DELETE products
DELETE test

为了确保下面的命令成功,我们首先删除已经创建过的 products 及 test 索引,如果它们之前已经被创建过。这是一个很简单的操作,就是发送一个 DELETE 指令。

接下来,我们在一个文件中定义一个如下的 mappings:

mappings.json

{
  "properties" : {
    "id" : {
      "type" : "keyword"
    },
    "name" : {
      "type" : "text",
      "fields" : {
        "keyword" : {
          "type" : "keyword",
          "ignore_above" : 256
        }
      }
    },
    "price" : {
      "type" : "long"
    }
  }
}

这是我们想要的一个 mapping。它定义了索引中想要的字段的类型。

我们接着使用如下的命令来创建一个叫做 test 的索引:

       String mappingPath = System.getProperty("user.dir") + "/mappings.json";
        JsonpMapper mapper = client._transport().jsonpMapper();
        String mappings_str = new String(Files.readAllBytes(Paths.get(mappingPath)));
        System.out.println("mappings are: " +  mappings_str);
        JsonParser parser = mapper.jsonProvider()
                .createParser(new StringReader( mappings_str ));

        client.indices()
                .create(createIndexRequest -> createIndexRequest.index("test")
                        .mappings(TypeMapping._DESERIALIZER.deserialize(parser, mapper)));

首先,我们读入 mappings.json 文件,并使用 client.indices 来创建 test 索引。显然和我们的 console 中的命令很相似。从一个叫做 test 的索引中创建一个 mappings。

当我们成功运行上面的命令后,我们可以在 Kibana 中进行查看:

GET test/_mapping

它的响应为:

{
  "test" : {
    "mappings" : {
      "properties" : {
        "id" : {
          "type" : "keyword"
        },
        "name" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "price" : {
          "type" : "long"
        }
      }
    }
  }
}

接下来,我们使用另外一种方法来创建 mappings:

        String mappings = "{\n" +
                "  \"properties\" : {\n" +
                "    \"id\" : {\n" +
                "      \"type\" : \"keyword\" \n" +
                "    },\n"+
                "    \"name\" : {\n" +
                "      \"type\" : \"text\",\n" +
                "      \"fields\" : {\n" +
                "        \"keyword\" : {\n" +
                "          \"type\" : \"keyword\",\n" +
                "          \"ignore_above\" : 256 \n" +
                "        }\n" +
                "      } \n" +
                "    }, \n" +
                "    \"price\" : { \n" +
                "      \"type\" : \"long\" \n" +
                "     } \n" +
                "  }\n" +
                "}\n";

        System.out.println( "mappings are: " + mappings );
        JsonpMapper mapper1 = client._transport().jsonpMapper();
        JsonParser parser1 = Json.createParser(new StringReader(mappings));
        CreateIndexRequest request_create =  new CreateIndexRequest.Builder()
                .index("products")
                .mappings(TypeMapping._DESERIALIZER.deserialize(parser1, mapper1))
                .build();
        CreateIndexResponse response_create = indices.create(request_create);
        System.out.println(response_create.acknowledged());

在上面,我使用了一个字符串 mappings 来定义索引 products 的 mappings。我们可以通过打印的方法来检查字符串的输出是否正确。我们必须确保这个字符串输出和我们在 console 里的一致性,否则会造成错误。接下来的代码和之前的一样。

当我们成功运行上面的代码后,我们可以在 Kibana 中进行查看:

GET products/_mapping

它的响应是:

{
  "products" : {
    "mappings" : {
      "properties" : {
        "id" : {
          "type" : "keyword"
        },
        "name" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "price" : {
          "type" : "long"
        }
      }
    }
  }
}

显然它是按照我们的需求来创建的 mappings。

使用 _bulk 来进行批量写入

在我们写入的时候,_bulk 指令可以一次性大量写入我们的文档,从而提高效率。那么我们该如何在 Java 里进行实现呢?

        Product prod1 = new Product("prod1", "washing machine", 42);
        Product prod2 = new Product("prod2", "TV", 42);

        List<Product> products = new ArrayList<Product>();
        products.add( prod1 );
        products.add( prod2 );

        BulkRequest.Builder br = new BulkRequest.Builder();
        for (Product product : products) {
            br.operations(op -> op
                    .index(idx -> idx
                            .index("products")
                            .id(product.getId())
                            .document(product)
                    )
            );
        }

        BulkResponse result = client.bulk(br.refresh(Refresh.WaitFor).build());

        if (result.errors()) {
            System.out.println("Bulk had errors");
            for (BulkResponseItem item: result.items()) {
                if (item.error() != null) {
                    System.out.println(item.error().reason());
                }
            }
        }

在上面,当我写入的时候,我们强制使用 refresh 以使得写入的文档在紧接下来的操作中变为可以搜索的。关于 refresh 的操作,你可以阅读我的另外一篇文章 “Elasticsearch:Elasticsearch 中的 refresh 和 flush 操作指南”。实际上它的操作也非常简单。如上面的代码所示,我们首先创建两个文档的列表,然后使用 BulkRequest 来创建请求。在请求中,我们使用了 index 的操作。上面的这个操作非常像如下的这个命令:

POST _bulk
{ "index" : { "_index" : "produtcs", "_id" : "prod1" } }
{ "id" : "prod1", "name": "washing machine", "price": 42 }
{ "index" : { "_index" : "produtcs", "_id" : "prod2" } }
{ "id" : "prod2", "name": "TV", "price": 42 }

运行上面的命令后,我们执行如下的命令来进行查看:

GET products/_search?filter_path=**.hits

上面命令显示的结果为:

{
  "hits" : {
    "hits" : [
      {
        "_index" : "products",
        "_id" : "prod1",
        "_score" : 1.0,
        "_source" : {
          "id" : "prod1",
          "name" : "washing machine",
          "price" : 42
        }
      },
      {
        "_index" : "products",
        "_id" : "prod2",
        "_score" : 1.0,
        "_source" : {
          "id" : "prod2",
          "name" : "TV",
          "price" : 42
        }
      }
    ]
  }
}

显然我们的写入是成功的,并且是一次请求,同时写入了两个文档。在实际的使用中,我们可以写入成百上千的文档。

使用 update by query 进行批量更新

我仔细看了以之前的那个开发者在我的文章 “ “Elasticsearch:使用最新的 Elasticsearch Java client 8.0 来创建索引并搜索”,他的需求是如何使用 update by queryAPI 来进行批量修改文档。在我们的 console 中,我们可以打入如下的命令来进行修改一个文档:

POST products/_update_by_query
{
  "query": {
    "match": {
      "id": "prod1"
    }
  },
  "script": {
    "source": """
      ctx._source['price'] = 50
    """,
    "lang": "painless"
  }
}

当我们运行上面的命令后,我们重新查看之前写入的文档:

GET products/_search?filter_path=**.hits
{
  "hits" : {
    "hits" : [
      {
        "_index" : "products",
        "_id" : "prod2",
        "_score" : 1.0,
        "_source" : {
          "id" : "prod2",
          "name" : "TV",
          "price" : 42
        }
      },
      {
        "_index" : "products",
        "_id" : "prod1",
        "_score" : 1.0,
        "_source" : {
          "price" : 50,
          "name" : "washing machine",
          "id" : "prod1"
        }
      }
    ]
  }
}

从上面的命令中,我们可以看出来 prod1 的 price 被更新为 50。

接下来,我们来通过代码的形式来更新这个文档:

        UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest.Builder()
                .index("products")
                .query(q -> q
                        .match( m -> m
                                .field("id")
                                .query("prod1")
                        )
                )
                .script(s -> s.inline( src -> src
                        .lang("painless")
                        .source("ctx._source['price'] = 50")
                ))
                .build();

        UpdateByQueryResponse response_update = client.updateByQuery(updateByQueryRequest);
        System.out.println("updated : " + response_update.updated());

在上面,我们的代码其实和之前的请求非常相似。它操作于一个索引 products 之上,并对它进行 query。我们使用 script 来对它进行修改。运行上面的代码,我们可以看到如下的输出:

updated : 1

它表明的我们的更新是成功的。当然我们也可以在 Kibana 中进行查看。我们会看到文档确实被更新了。

好了,进行的文章就分享到这里。希望大家学到知识!

有关Elasticsearch:在 Java 应用中创建 mappings,批量写入及更新 - Java client 8.x的更多相关文章

  1. ruby-on-rails - 如何验证 update_all 是否实际在 Rails 中更新 - 2

    给定这段代码defcreate@upgrades=User.update_all(["role=?","upgraded"],:id=>params[:upgrade])redirect_toadmin_upgrades_path,:notice=>"Successfullyupgradeduser."end我如何在该操作中实际验证它们是否已保存或未重定向到适当的页面和消息? 最佳答案 在Rails3中,update_all不返回任何有意义的信息,除了已更新的记录数(这可能取决于您的DBMS是否返回该信息)。http://ar.ru

  2. ruby - 将差异补丁应用于字符串/文件 - 2

    对于具有离线功能的智能手机应用程序,我正在为Xml文件创建单向文本同步。我希望我的服务器将增量/差异(例如GNU差异补丁)发送到目标设备。这是计划:Time=0Server:hasversion_1ofXmlfile(~800kiB)Client:hasversion_1ofXmlfile(~800kiB)Time=1Server:hasversion_1andversion_2ofXmlfile(each~800kiB)computesdeltaoftheseversions(=patch)(~10kiB)sendspatchtoClient(~10kiBtransferred)Cl

  3. ruby-on-rails - Rails 应用程序之间的通信 - 2

    我构建了两个需要相互通信和发送文件的Rails应用程序。例如,一个Rails应用程序会发送请求以查看其他应用程序数据库中的表。然后另一个应用程序将呈现该表的json并将其发回。我还希望一个应用程序将存储在其公共(public)目录中的文本文件发送到另一个应用程序的公共(public)目录。我从来没有做过这样的事情,所以我什至不知道从哪里开始。任何帮助,将不胜感激。谢谢! 最佳答案 无论Rails是什么,几乎所有Web应用程序都有您的要求,大多数现代Web应用程序都需要相互通信。但是有一个小小的理解需要你坚持下去,网站不应直接访问彼此

  4. ruby - 无法运行 Rails 2.x 应用程序 - 2

    我尝试运行2.x应用程序。我使用rvm并为此应用程序设置其他版本的ruby​​:$rvmuseree-1.8.7-head我尝试运行服务器,然后出现很多错误:$script/serverNOTE:Gem.source_indexisdeprecated,useSpecification.Itwillberemovedonorafter2011-11-01.Gem.source_indexcalledfrom/Users/serg/rails_projects_terminal/work_proj/spohelp/config/../vendor/rails/railties/lib/r

  5. ruby-on-rails - Rails 应用程序中的 Rails : How are you using application_controller. rb 是新手吗? - 2

    刚入门rails,开始慢慢理解。有人可以解释或给我一些关于在application_controller中编码的好处或时间和原因的想法吗?有哪些用例。您如何为Rails应用程序使用应用程序Controller?我不想在那里放太多代码,因为据我了解,每个请求都会调用此Controller。这是真的? 最佳答案 ApplicationController实际上是您应用程序中的每个其他Controller都将从中继承的类(尽管这不是强制性的)。我同意不要用太多代码弄乱它并保持干净整洁的态度,尽管在某些情况下ApplicationContr

  6. Ruby 写入和读取对象到文件 - 2

    好的,所以我的目标是轻松地将一些数据保存到磁盘以备后用。您如何简单地写入然后读取一个对象?所以如果我有一个简单的类classCattr_accessor:a,:bdefinitialize(a,b)@a,@b=a,bendend所以如果我从中非常快地制作一个objobj=C.new("foo","bar")#justgaveitsomerandomvalues然后我可以把它变成一个kindaidstring=obj.to_s#whichreturns""我终于可以将此字符串打印到文件或其他内容中。我的问题是,我该如何再次将这个id变回一个对象?我知道我可以自己挑选信息并制作一个接受该信

  7. ruby-on-rails - 使用 rails 4 设计而不更新用户 - 2

    我将应用程序升级到Rails4,一切正常。我可以登录并转到我的编辑页面。也更新了观点。使用标准View时,用户会更新。但是当我添加例如字段:name时,它​​不会在表单中更新。使用devise3.1.1和gem'protected_attributes'我需要在设备或数据库上运行某种更新命令吗?我也搜索过这个地方,找到了许多不同的解决方案,但没有一个会更新我的用户字段。我没有添加任何自定义字段。 最佳答案 如果您想允许额外的参数,您可以在ApplicationController中使用beforefilter,因为Rails4将参数

  8. java - 等价于 Java 中的 Ruby Hash - 2

    我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/

  9. ruby-on-rails - 如何在我的 Rails 应用程序 View 中打印 ruby​​ 变量的内容? - 2

    我是一个Rails初学者,但我想从我的RailsView(html.haml文件)中查看Ruby变量的内容。我试图在ruby​​中打印出变量(认为它会在终端中出现),但没有得到任何结果。有什么建议吗?我知道Rails调试器,但更喜欢使用inspect来打印我的变量。 最佳答案 您可以在View中使用puts方法将信息输出到服务器控制台。您应该能够在View中的任何位置使用Haml执行以下操作:-puts@my_variable.inspect 关于ruby-on-rails-如何在我的R

  10. ruby-on-rails - Rails - 从另一个模型中创建一个模型的实例 - 2

    我有一个正在构建的应用程序,我需要一个模型来创建另一个模型的实例。我希望每辆车都有4个轮胎。汽车模型classCar轮胎模型classTire但是,在make_tires内部有一个错误,如果我为Tire尝试它,则没有用于创建或新建的activerecord方法。当我检查轮胎时,它没有这些方法。我该如何补救?错误是这样的:未定义的方法'create'forActiveRecord::AttributeMethods::Serialization::Tire::Module我测试了两个环境:测试和开发,它们都因相同的错误而失败。 最佳答案

随机推荐