elasticsearch支持各种类型的聚合查询,给我们做数据统计、数据分析时提供了强大的处理能力,但是作为java开发者,如何在java client中实现这些聚合呢?
我们知道spring-data-elasticsearch提供了针对整合spring的es java client,但是在elastic、spring-data官方文档中都没有详细说明聚合查询在java client中如何实现。
所以本期,我们的目标就是一篇将这些聚合操作一网打尽!
为了更好的将这些聚合讲解清楚,我们结合es官方文档的结构,将三种类型的聚合一一讲解。但不会将每种小类型都演示一遍,相信经过几种常用类型的演示,大家自己也能推敲出其他类型的用法。如果实在写不出来的可以留言博主,我们一起来探讨

本次演示基于以下环境
spring-data-elasticsearch3.2.12.RELEASE
基础环境的搭建可参考这篇文章:
从零搭建springboot+spring data elasticsearch3.x环境
在开始讲解之前,我们先声明我们的索引结构,方便大家后续理解我们的案例
# 订单索引,一个订单下有多个商品
PUT order_test
{
"mappings": {
"properties": {
// 订单状态 0未付款 1未发货 2运输中 3待签收 4已签收
"status": {
"type": "integer"
},
// 订单编号
"no": {
"type": "keyword"
},
// 下单时间
"create_time": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
},
// 订单金额
"amount": {
"type": "double"
},
// 创建人
"creator":{
"type": "keyword"
},
// 商品信息
"product":{
"type": "nested",
"properties": {
// 商品ID
"id": {
"type": "keyword"
},
// 商品名称
"name":{
"type": "keyword"
},
// 商品价格
"price": {
"type": "double"
},
// 商品数量
"quantity": {
"type": "integer"
}
}
}
}
}
}
测试数据,供大家跟练
POST order_test/_bulk
{"index":{}}
{"status":0,"no":"DD202205280001","create_time":"2022-05-01 12:00:00","amount":100.0,"creator":"张三","product":[{"id":"1","name":"苹果","price":20.0,"quantity":5}]}
{"index":{}}
{"status":0,"no":"DD202205280002","create_time":"2022-05-01 12:00:00","amount":100.0,"creator":"李四","product":[{"id":"2","name":"香蕉","price":20.0,"quantity":5}]}
{"index":{}}
{"status":1,"no":"DD202205280003","create_time":"2022-05-02 12:00:00","amount":100.0,"creator":"张三","product":[{"id":"2","name":"香蕉","price":20.0,"quantity":5}]}
{"index":{}}
{"status":2,"no":"DD202205280004","create_time":"2022-05-01 12:00:00","amount":150.0,"creator":"王二","product":[{"id":"1","name":"苹果","price":30.0,"quantity":5}]}
{"index":{}}
{"status":2,"no":"DD202205280005","create_time":"2022-05-03 12:00:00","amount":100.0,"creator":"55555","product":[{"id":"2","name":"香蕉","price":20.0,"quantity":5}]}
{"index":{}}
{"status":3,"no":"DD202205280006","create_time":"2022-05-04 12:00:00","amount":150.0,"creator":"李四","product":[{"id":"3","name":"榴莲","price":150.0,"quantity":1}]}
{"index":{}}
{"status":4,"no":"DD202205280007","create_time":"2022-05-04 12:00:00","amount":100.0,"creator":"张三","product":[{"id":"2","name":"香蕉","price":20.0,"quantity":5}]}
{"index":{}}
{"status":3,"no":"DD202205280008","create_time":"2022-05-01 12:00:00","amount":200.0,"creator":"王二","product":[{"id":"1","name":"苹果","price":40.0,"quantity":5}]}
{"index":{}}
{"status":4,"no":"DD202205280009","create_time":"2022-05-03 12:00:00","amount":100.0,"creator":"55555","product":[{"id":"2","name":"香蕉","price":20.0,"quantity":5}]}
分桶聚合中最常用的就是terms聚合了,它可以按照指定字段将数据分组聚合,类似mysql中的group by
要求统计各种状态的单数
GET order_test/_search
{
"size": 0,
"aggs": {
"status_bucket": {
"terms": {
"field": "status"
}
}
}
}

public void termsAgg(){
String aggName = "status_bucket";
NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder();
queryBuilder.withPageable(PageRequest.of(0,1));
TermsAggregationBuilder termsAgg = AggregationBuilders.terms(aggName).field("status");
queryBuilder.addAggregation(termsAgg);
Aggregations aggregations = restTemplate.query(queryBuilder.build(), SearchResponse::getAggregations);
Terms terms = aggregations.get(aggName);
List<? extends Terms.Bucket> buckets = terms.getBuckets();
HashMap<String,Long> statusRes = new HashMap<>();
buckets.forEach(bucket -> {
statusRes.put(bucket.getKeyAsString(),bucket.getDocCount());
});
System.out.println("---聚合结果---");
System.out.println(statusRes);
}
日期分组聚合可以按照日期进行分组,常用到一些日期趋势统计中
统计每天的下单量
GET order_test/_search
{
"size": 0,
"aggs": {
"date": {
"date_histogram": {
"field": "create_time",
"calendar_interval": "day",
"format": "yyyy-MM-dd"
}
}
}
}

public void dateHistogramAgg(){
String aggName = "date";
DateHistogramAggregationBuilder dateHistogramAggregation = AggregationBuilders.dateHistogram(aggName).field("create_time")
.calendarInterval(DateHistogramInterval.days(1)).format("yyyy-MM-dd");
NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder();
queryBuilder.withPageable(PageRequest.of(0,1)).addAggregation(dateHistogramAggregation);
Aggregations aggregations = restTemplate.query(queryBuilder.build(), SearchResponse::getAggregations);
ParsedDateHistogram terms = aggregations.get(aggName);
List<? extends Histogram.Bucket> buckets = terms.getBuckets();
HashMap<String,Long> resultMap = new HashMap<>();
buckets.forEach(bucket -> {
resultMap.put(bucket.getKeyAsString(),bucket.getDocCount());
});
System.out.println("---聚合结果---");
System.out.println(resultMap);
}
这里大家会发现使用的是ParsedDateHistogram来承接结果,与上述的Term不一致,那么我们怎么知道什么时候该用哪个呢?实际上可以通过断点来判断
我们通过把断点截取到restTemplate.query的执行结果aggregations之后,会发现该aggregations中的元素已经标明了其类型为ParsedDateHistogram,所以大家只需要跟着用就可以了。

范围分组聚合可以帮助我们按照指定的数值范围进行分组
统计订单金额在0~100,100~200,200+ 这几个区间的订单数量
GET order_test/_search
{
"size": 0,
"aggs": {
"date_range": {
"range": {
"field": "amount",
"ranges": [
{
"to": "100"
},
{
"from": "100",
"to": "200"
},
{
"from": "200"
}
]
}
}
}
}

public void rangeAgg(){
String aggName = "range";
RangeAggregationBuilder agg = AggregationBuilders.range(aggName).field("amount").addUnboundedTo(100).addRange(100, 200).addUnboundedFrom(200);
NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder();
queryBuilder.withPageable(PageRequest.of(0,1)).addAggregation(agg);
Aggregations aggregations = restTemplate.query(queryBuilder.build(), SearchResponse::getAggregations);
ParsedRange terms = aggregations.get(aggName);
List<? extends Range.Bucket> buckets = terms.getBuckets();
HashMap<String,Long> resultMap = new HashMap<>();
buckets.forEach(bucket -> {
resultMap.put(bucket.getKeyAsString(),bucket.getDocCount());
});
System.out.println("---聚合结果---");
System.out.println(resultMap);
}
nested聚合专用于json型子对象进行聚合,比如上述案例中product是json型数组,如果当我们想通过商品中的属性来聚合统计时就需要用到nested聚合,直接使用product.name来聚合其结果则不会是我们预期的,这主要与es针对数组的存储形式有关。
统计每种货物的订单数
GET order_test/_search
{
"size": 0,
"aggs": {
"product_nested": {
"nested": {
"path": "product"
},
"aggs": {
"name_bucket": {
"terms": {
"field": "product.name"
}
}
}
}
}
}

subAggregation方法来定义子聚合 public void nestedAgg(){
String aggName = "product_nested";
String termsAggName = "name_bucket";
NestedAggregationBuilder aggregationBuilder = AggregationBuilders.nested(aggName, "product").subAggregation(AggregationBuilders.terms(termsAggName).field("product.name"));
NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder()
.withPageable(PageRequest.of(0,1))
.addAggregation(aggregationBuilder);
Aggregations aggregations = restTemplate.query(queryBuilder.build(), SearchResponse::getAggregations);
ParsedNested nestedRes = aggregations.get(aggName);
Terms terms = nestedRes.getAggregations().get(termsAggName);
List<? extends Terms.Bucket> buckets = terms.getBuckets();
HashMap<String,Long> resMap = new HashMap<>();
buckets.forEach(bucket -> {
resMap.put(bucket.getKeyAsString(),bucket.getDocCount());
});
System.out.println("---聚合结果---");
System.out.println(resMap);
}
求和聚合是常用的聚合之一,经常与分组聚合配合使用,用来统计出各组下的合计
求5月1日销售总额
GET order_test/_search
{
"query": {
"range": {
"create_time": {
"format": "yyyy-MM-dd",
"from": "2022-05-01",
"to": "2022-05-01"
}
}
},
"size": 0,
"aggs": {
"sum_amount": {
"sum": {
"field": "amount"
}
}
}
}

public void sumAgg(){
String aggName = "sumAmount";
SumAggregationBuilder agg = AggregationBuilders.sum(aggName).field("amount");
NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder()
.withPageable(PageRequest.of(0,1))
.withQuery(QueryBuilders.rangeQuery("create_time").format("yyyy-MM-dd").from("2022-05-01").to("2022-05-01"))
.addAggregation(agg);
Aggregations aggregations = restTemplate.query(queryBuilder.build(), SearchResponse::getAggregations);
ParsedSum metric = aggregations.get(aggName);
double value = metric.getValue();
System.out.println("---聚合结果---");
System.out.println(value);
}
脚本聚合支持我们通过脚本语言来自定义聚合的数值,es中脚本默认的语言为painless。需要注意的是脚本语法非常影响性能,我们一般是尽量避免使用。同时es中还提供了专门的脚本数值聚合 script metric aggregation,但因为不太常用,所以我们这里以更加常用的聚合脚本来讲解
求所有货物平均单价
GET order_test/_search
{
"size": 0,
"aggs": {
"total_amount":{
"sum": {
"field": "amount"
}
},
"total_quantity":{
"sum": {
"script": {
"source": """
int total = 0;
for(int i=0; i<params._source['product'].size(); i++){
if(params._source['product'][i]['quantity'] != null){
total += params._source['product'][i]['quantity'];
}
}
return total;
"""
}
}
}
}
}
可以看到这里,原本sum聚合中是field属性的,改成了script脚本来动态计算属性值,从而实现聚合。同理,script脚本不仅可以使用到sum聚合中,也可以用到其他metric聚合中。
执行结果

将结果total_amount除以total_quantity即可得到平均价格
java
public void scriptAgg(){
String totalAmountAggName = "total_amount";
String totalQuantityAggName = "total_quantity";
SumAggregationBuilder amountAgg = AggregationBuilders.sum(totalAmountAggName).field("amount");
SumAggregationBuilder quantityAgg = AggregationBuilders.sum(totalQuantityAggName).script(
new Script("int total = 0;\n" +
" for(int i=0; i<params._source['product'].size(); i++){\n" +
" if(params._source['product'][i]['quantity'] != null){\n" +
" total += params._source['product'][i]['quantity'];\n" +
" }\n" +
" }\n" +
" return total;"));
NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder()
.withPageable(PageRequest.of(0,1))
.addAggregation(amountAgg).addAggregation(quantityAgg);
Aggregations aggregations = restTemplate.query(queryBuilder.build(), SearchResponse::getAggregations);
ParsedSum amountRes = aggregations.get(totalAmountAggName);
ParsedSum quantityRes = aggregations.get(totalQuantityAggName);
double avgPrice = amountRes.getValue()/quantityRes.getValue();
System.out.println("---聚合结果---");
System.out.println(avgPrice);
}
首先要理解管道聚合的概念,与其他聚合不同,管道聚合是在其他聚合的结果下进行聚合操作的,所以管道聚合是配合其他聚合来工作的,而不是像其他聚合那样直接操作文档数据。
分桶脚本聚合用于多分桶聚合的指标进行二次计算,我们通过案例来具体体会他的用法。
求每个订单的货物平均单价
GET order_test/_search
{
"size": 0,
"aggs": {
"order_bucket": {
"terms": {
"field": "no"
},
"aggs": {
"total_amount": {
"sum": {
"field": "amount"
}
},
"total_quantity": {
"sum": {
"script": {
"source": """
int total = 0;
for(int i=0; i<params._source['product'].size(); i++){
if(params._source['product'][i]['quantity'] != null){
total += params._source['product'][i]['quantity'];
}
}
return total;
"""
}
}
},
"avg_price": {
"bucket_script": {
"buckets_path": {
"amount": "total_amount",
"quantity": "total_quantity"
},
"script": "params.amount / params.quantity"
}
}
}
}
}
}

AggregationBuilders类,而是PipelineAggregatorBuilders,其余的用法类似public void bucketScriptAgg(){
String aggName = "order_bucket";
String totalAmountAggName = "total_amount";
String totalQuantityAggName = "total_quantity";
String avgPriceAggName = "avg_price";
HashMap<String,String> bucketsPath = new HashMap<>();
bucketsPath.put("amount","total_amount");
bucketsPath.put("quantity","total_quantity");
TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms(aggName).field("no")
.subAggregation(AggregationBuilders.sum(totalAmountAggName).field("amount"))
.subAggregation(AggregationBuilders.sum(totalQuantityAggName).script(
new Script("int total = 0;\n" +
" for(int i=0; i<params._source['product'].size(); i++){\n" +
" if(params._source['product'][i]['quantity'] != null){\n" +
" total += params._source['product'][i]['quantity'];\n" +
" }\n" +
" }\n" +
" return total;")
))
.subAggregation(PipelineAggregatorBuilders.bucketScript(avgPriceAggName, bucketsPath,
new Script("params.amount / params.quantity")));
NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder()
.withPageable(PageRequest.of(0,1))
.addAggregation(aggregationBuilder);
Aggregations aggregations = restTemplate.query(queryBuilder.build(), SearchResponse::getAggregations);
HashMap<String,Double> resultMap = new HashMap<>();
Terms terms = aggregations.get(aggName);
List<? extends Terms.Bucket> buckets = terms.getBuckets();
buckets.forEach(bucket -> {
ParsedSimpleValue avgRes = bucket.getAggregations().get(avgPriceAggName);
resultMap.put(bucket.getKeyAsString(),Double.parseDouble(avgRes.getValueAsString()));
});
System.out.println("---聚合结果---");
System.out.println(resultMap);
}
bucket sort可以针对聚合结果实现自定义排序、分页,在桶排序中很常用。如果对于该聚合不清楚的同学可以查看我往篇博客介绍
求订单货物平均单价TOP5的订单
GET order_test/_search
{
"size": 0,
"aggs": {
"order_bucket": {
"terms": {
"field": "no"
},
"aggs": {
"total_amount": {
"sum": {
"field": "amount"
}
},
"total_quantity": {
"sum": {
"script": {
"source": """
int total = 0;
for(int i=0; i<params._source['product'].size(); i++){
if(params._source['product'][i]['quantity'] != null){
total += params._source['product'][i]['quantity'];
}
}
return total;
"""
}
}
},
"avg_price": {
"bucket_script": {
"buckets_path": {
"amount": "total_amount",
"quantity": "total_quantity"
},
"script": "params.amount / params.quantity"
}
},
"avg_price_sort": {
"bucket_sort": {
"sort": [
{"avg_price":{"order":"desc"}}
],
"from": 0,
"size": 5
}
}
}
}
}
}
执行结果

java:同上使用的聚合生成器就不再是AggregationBuilders类,而是PipelineAggregatorBuilders,其余的用法类似
public void bucketSortAgg(){
String aggName = "order_bucket";
String totalAmountAggName = "total_amount";
String totalQuantityAggName = "total_quantity";
String avgPriceAggName = "avg_price";
String bucketSortAggName = "avg_price_sort";
HashMap<String,String> bucketsPath = new HashMap<>();
bucketsPath.put("amount","total_amount");
bucketsPath.put("quantity","total_quantity");
List<FieldSortBuilder> sortList = new ArrayList<>();
FieldSortBuilder fieldSortBuilder = new FieldSortBuilder("avg_price").order(SortOrder.DESC);
sortList.add(fieldSortBuilder);
TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms(aggName).field("no")
.subAggregation(AggregationBuilders.sum(totalAmountAggName).field("amount"))
.subAggregation(AggregationBuilders.sum(totalQuantityAggName).script(
new Script("int total = 0;\n" +
" for(int i=0; i<params._source['product'].size(); i++){\n" +
" if(params._source['product'][i]['quantity'] != null){\n" +
" total += params._source['product'][i]['quantity'];\n" +
" }\n" +
" }\n" +
" return total;")
))
.subAggregation(PipelineAggregatorBuilders.bucketScript(avgPriceAggName, bucketsPath,
new Script("params.amount / params.quantity")))
.subAggregation(PipelineAggregatorBuilders.bucketSort(bucketSortAggName,sortList).from(0).size(5));
NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder()
.withPageable(PageRequest.of(0,1))
.addAggregation(aggregationBuilder);
Aggregations aggregations = restTemplate.query(queryBuilder.build(), SearchResponse::getAggregations);
// 因为要求按序输出,所以这里使用LinkedHashMap,HashMap不会按照顺序显示
LinkedHashMap<String,Double> resultMap = new LinkedHashMap<>();
Terms terms = aggregations.get(aggName);
List<? extends Terms.Bucket> buckets = terms.getBuckets();
buckets.forEach(bucket -> {
ParsedSimpleValue avgRes = bucket.getAggregations().get(avgPriceAggName);
resultMap.put(bucket.getKeyAsString(),Double.parseDouble(avgRes.getValueAsString()));
});
System.out.println("---聚合结果---");
System.out.println(resultMap);
}
需要注意的是并不是所有的DSL都可以在spring-data-elasticsearch中实现,某些操作在kibana中可以执行,但是在spring-data-elasticsearch中就不能执行了,还要注意es版本与spring-data-elasticsearch的版本统一,具体如下图

要想对java client深入理解,更多需要大家自己动手操作试试
我正在用Ruby编写一个简单的程序来检查域列表是否被占用。基本上它循环遍历列表,并使用以下函数进行检查。require'rubygems'require'whois'defcheck_domain(domain)c=Whois::Client.newc.query("google.com").available?end程序不断出错(即使我在google.com中进行硬编码),并打印以下消息。鉴于该程序非常简单,我已经没有什么想法了-有什么建议吗?/Library/Ruby/Gems/1.8/gems/whois-2.0.2/lib/whois/server/adapters/base.
我正在使用Ruby2.1.1和Rails4.1.0.rc1。当执行railsc时,它被锁定了。使用Ctrl-C停止,我得到以下错误日志:~/.rvm/gems/ruby-2.1.1/gems/spring-1.1.2/lib/spring/client/run.rb:47:in`gets':Interruptfrom~/.rvm/gems/ruby-2.1.1/gems/spring-1.1.2/lib/spring/client/run.rb:47:in`verify_server_version'from~/.rvm/gems/ruby-2.1.1/gems/spring-1.1.
我知道我可以指定某些字段来使用pluck查询数据库。ids=Item.where('due_at但是我想知道,是否有一种方法可以指定我想避免从数据库查询的某些字段。某种反拔?posts=Post.where(published:true).do_not_lookup(:enormous_field) 最佳答案 Model#attribute_names应该返回列/属性数组。您可以排除其中一些并传递给pluck或select方法。像这样:posts=Post.where(published:true).select(Post.attr
“输出”是一个序列化的OpenStruct。定义标题try(:output).try(:data).try(:title)结束什么会更好?:) 最佳答案 或者只是这样:deftitleoutput.data.titlerescuenilend 关于ruby-on-rails-更好的替代方法try(:output).try(:data).try(:name)?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.c
@作者:SYFStrive @博客首页:HomePage📜:微信小程序📌:个人社区(欢迎大佬们加入)👉:社区链接🔗📌:觉得文章不错可以点点关注👉:专栏连接🔗💃:感谢支持,学累了可以先看小段由小胖给大家带来的街舞👉微信小程序(🔥)目录自定义组件-behaviors 1、什么是behaviors 2、behaviors的工作方式 3、创建behavior 4、导入并使用behavior 5、behavior中所有可用的节点 6、同名字段的覆盖和组合规则总结最后自定义组件-behaviors 1、什么是behaviorsbehaviors是小程序中,用于实现
转自:spring.profiles.active和spring.profiles.include的使用及区别说明下文笔者讲述spring.profiles.active和spring.profiles.include的区别简介说明,如下所示我们都知道,在日常开发中,开发|测试|生产环境都拥有不同的配置信息如:jdbc地址、ip、端口等此时为了避免每次都修改全部信息,我们则可以采用以上的属性处理此类异常spring.profiles.active属性例:配置文件,可使用以下方式定义application-${profile}.properties开发环境配置文件:application-dev
我正在尝试查询我的Rails数据库(Postgres)中的购买表,我想查询时间范围。例如,我想知道在所有日期的下午2点到3点之间进行了多少次购买。此表中有一个created_at列,但我不知道如何在不搜索特定日期的情况下完成此操作。我试过:Purchases.where("created_atBETWEEN?and?",Time.now-1.hour,Time.now)但这最终只会搜索今天与那些时间的日期。 最佳答案 您需要使用PostgreSQL'sdate_part/extractfunction从created_at中提取小时
我在Rails上使用带有ruby的solr。一切正常,我只需要知道是否有任何现有代码来清理用户输入,比如以?开头的查询。或* 最佳答案 我不知道执行此操作的任何代码,但理论上可以通过查看parsingcodeinLucene来完成并搜索thrownewParseException(只有16个匹配!)。在实践中,我认为您最好只捕获代码中的任何solr异常并显示“无效查询”消息或类似信息。编辑:这里有几个“sanitizer”:http://pivotallabs.com/users/zach/blog/articles/937-s
我正在为锦标赛开发一个Rails应用程序。我在这个查询中使用了三个模型:classPlayertruehas_and_belongs_to_many:tournamentsclassTournament:destroyclassPlayerMatch"Player",:foreign_key=>"player_one"belongs_to:player_two,:class_name=>"Player",:foreign_key=>"player_two"在tournaments_controller的显示操作中,我调用以下查询:Tournament.where(:id=>params
我想用sunspot重现以下原始solr查询q=exact_term_text:fooORterm_textv:foo*ORalternate_text:bar*但我无法通过标准的太阳黑子界面理解这是否可能以及如何实现,因为看起来:fulltext方法似乎不接受多个文本/搜索字段参数我不知道将什么参数作为第一个参数传递给fulltext,就好像我通过了"foo"或"bar"结果不匹配如果我传递一个空参数,我得到一个q=*:*范围过滤器(例如with(:term).starting_with('foo*')(顾名思义)作为过滤器查询应用,因此不参与评分。似乎可以手动编写字符串(或者可能使