草庐IT

【ES小结】还在用ElasticSearch做查询?换条思路实现高效数据统计

派 大 星. 2023-07-12 原文

🏡 博客首页:派 大 星

⛳️ 欢迎关注 🐳 点赞 🎒 收藏 ✏️ 留言

🎢 本文由派大星原创编撰

🚧 系列专栏:《ES小结》

🎈 本系列记录ElasticSearch技术学习历程以及问题解决



ElasticSearch高效数据统计

聚合查询

① 什么是聚合查询

聚合是ES除搜索功能外提供的针对ES数据做统计分析的功能,聚合有助于根据搜索查询提供聚合数据,聚合查询是数据库中重要额功能特性,ES作为搜索引擎兼数据库,同样提供了强大的聚合分析功能力,它是基于查询条件来对数据进行分桶、计算的方法,这种很类似与SQL 中的group by 再加上一些函数方法的操作。

在了解聚合查询之前需要注意的一点是:text类型是不支持聚合的,主要是因为text类型本身是分词的,通俗的说,如果一句话分成了多个词然后进行group by操作,那么问题就出现了,到底对哪一个词进行group by操作呢?无法指定!

② Kibana 命令测试聚合查询

创建测试索引
PUT /fruit
{
    "mappings":{
        "properties":{
            "title":"keyword"
        },
        "price":{
            "type":"double"
        },
        "description":{
            "type":"text"
        }
    }
}

存放测试数据
PUT /fruit/_bulk
{"index":{}}
	{"title":"面包","price":19.6,"description":"小面包很便宜"}
{"index":{}}
	{"title":"旺旺牛奶","price":29.6,"description":"旺旺牛奶很好喝"}
{"index":{}}
	{"title":"日本豆","price":9.0,"description":"日本豆很便宜"}
{"index":{}}
	{"title":"大辣条","price":10.6,"description":"大辣条超级好吃"}
{"index":{}}
	{"title":"海苔","price":49.6,"description":"海苔很一般"}
{"index":{}}
	{"title":"小饼干","price":9.6,"description":"小饼干很小"}
{"index":{}}
	{"title":"小葡萄","price":59.6,"description":"小葡萄很好吃"}	
{"index":{}}
	{"title":"小饼干","price":19.6,"description":"小饼干很小"}
{"index":{}}
	{"title":"小饼干","price":59.6,"description":"小饼干很小"}
{"index":{}}
	{"title":"小饼干","price":29.6,"description":"小饼干很小"}
{"index":{}}
	{"title":"小饼干","price":39.6,"description":"小饼干很小"}

③ 聚合操作使用

根据某个字段分组
GET /fruit/_search
{
  "query": {
    "match_all": {
      
    }
  },
  "aggs": {
    "price_group": {
      "terms": {
        "field": "price"
      }
    }
  }
}

求最大值
GET /fruit/_search
{
  "query": {
    "match_all": {}
  },
  "aggs": {
    "max_price": {
      "max": {
        "field": "price"
      }
    }
  }
}

最小值
GET /fruit/_search
{
  "query": {
    "match_all": {}
  },
  "size": 0, 
  "aggs": {
    "min_price": {
      "min": {
        "field": "price"
      }
    }
  }
}

求总数
GET /fruit/_search
{
  "query": {
    "match_all": {}
  },
  "size": 0, 
  "aggs": {
    "min_price": {
      "sum": {
        "field": "price"
      }
    }
  }
}

求平均值
GET /fruit/_search
{
  "query": {
    "match_all": {}
  },
  "size": 0, 
  "aggs": {
    "avg_price": {
      "avg": {
        "field": "price"
      }
    }
  }
}

④ RestHighLevelClient 测试聚合查询

在使用Java API实现上述操作之前,有必要先了解一下实现过程中使用到的某些方法以及工具

常见的聚合查询:

  • 统计某个字段的数量

ValueCountBuilder vcb= AggregationBuilders.count(“分组的名称”).field(“字段”);

  • 去重统计某个字段的数量(有少量的误差)

CardinalityBuilder cb= AggregationBuilders.cardinality(“分组的名称”).field(“字段”);

  • 聚合过滤

FilterAggregationBuilder fab= AggregationBuilders.filter(“分组的名称”).filter(QueryBuilders.queryStringQuery(“字段:过滤值”));

  • 按某个字段分组

TermsBuilder tb= AggregationBuilders.terms(“分组的名称”).field(“字段”);

  • 求最大值

SumBuilder sumBuilder= AggregationBuilders.max(“分组的名称”).field(“字段”);

  • 求最小值

AvgBuilder ab= AggregationBuilders.min(“分组的名称”).field(“字段”);

  • 求平均值

MaxBuilder mb= AggregationBuilders.avg(“分组的名称”).field(“字段”);

  • 按日期间隔分组

DateHistogramBuilder dhb= AggregationBuilders.dateHistogram(“分组的名称”).field(“字段”);

  • 获取聚合里面的结果

TopHitsBuilder thb= AggregationBuilders.topHits(“分组的名称”);

  • 嵌套的聚合

NestedBuilder nb= AggregationBuilders.nested(“分组的名称”).path(“字段”);

  • 反转嵌套

AggregationBuilders.reverseNested(“分组的名称”).path("字段 ");

使用Java API实现上述在Kibana中的各项操作

根据某个字段分组
public class RestHighLevelClientForAggs {
    public static void main(String[] args) {
        RestHighLevelClient esClient = Client.getClient();
        //基于terms 类型聚合 基于字段进行分组聚合
        SearchRequest request = new SearchRequest("fruit");
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder
            .query(QueryBuilders.matchAllQuery())//查询条件
            //用来设置聚合处理
         	.aggregation(AggregationBuilders.terms("price_group").field("price"))
            .size(0);
        request.source(sourceBuilder);
        SearchResponse response = null;
        try {
            response = esClient.search(request, RequestOptions.DEFAULT);
            //处理聚合的结果
            Aggregations aggregations = response.getAggregations();
            ParsedDoubleTerms doubleTerms = aggregations.get("price_group");
            List<? extends Terms.Bucket> buckets = doubleTerms.getBuckets();
            for (Terms.Bucket bucket : buckets) {
                System.out.println(bucket.getKey()+" "+bucket.getDocCount());
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

求最大值
public class AggregationForMax {
    public static void main(String[] args) {
        RestHighLevelClient client = Client.getClient();


        SearchRequest request = new SearchRequest("fruit");
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder
                .query(QueryBuilders.matchAllQuery())
                .aggregation(AggregationBuilders.max("max_price").field("price"))
                .size(0);
        request.source(sourceBuilder);
        try {
            SearchResponse searchResponse =
            client.search(request,RequestOptions.DEFAULT);
            Aggregations aggregations = searchResponse.getAggregations();
            ParsedMax maxPrice = aggregations.get("max_price");
            System.out.println(maxPrice.getValueAsString());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

注意: 在最终获取分组中的数据时,首先判断所求得的结果是否是Key-Value的结果,比如上述根据某个字段分组的示例从Kibana中就可以看出是Key-Value的形式,所以aggregations.get("分组名称");返回的结果应该为ParsedXXXXTerms类型,如果像求最大值、平均值、最小值等在执行到该aggregations.get("分组名称");返回的结果应该为ParsedXXX类型

求最小值
public class AggregationForMin {
    public static void main(String[] args) {
        RestHighLevelClient client = Client.getClient();
        SearchRequest searchRequest = new SearchRequest("fruit");
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();

        sourceBuilder
                .query(QueryBuilders.matchAllQuery())
                .aggregation(AggregationBuilders.min("min_price").field("price"))
                .size(0);
        searchRequest.source(sourceBuilder);
        try {
            SearchResponse searchResponse = 
                client.search(searchRequest, RequestOptions.DEFAULT);
            Aggregations aggregations = searchResponse.getAggregations();
            ParsedMin minPrice = aggregations.get("min_price");
            System.out.println(minPrice.getValueAsString());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

等等一系列需求的演示和模拟,使用ES来完成数据的统计。

⑤ 子聚合

先从需求展开,先按照title进行分组,然后再对每一个分组中的成员对价格price进行降序排序

先使用命令在Kibana中实现该操作,其次再根据实现的命令转换为Java代码实现

使用命令操作进行实现

GET /fruit/_search
{
  "query": {
    "match_all": {}
  },
  "size": 0, 
  "aggs": {
    "title_group": {
      "terms": {
        "field": "title"
      },
      "aggs": {
        "sort_price": {
          "terms": {
            "field": "price",
            "order": {
              "_key": "desc"
            }
          }
        }
      }
    }
  }
}

将实现的命令转换为Java流程

public class AggregationForSub {
    public static void main(String[] args) {
        RestHighLevelClient client = Client.getClient();
        SearchRequest searchRequest = new SearchRequest("fruit");
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        TermsAggregationBuilder termsAggregationBuilder = 
        AggregationBuilders.terms("title_group").field("title");
        TermsAggregationBuilder subAggregationBuilder = 
        AggregationBuilders.terms("price_sort").field("price").order(BucketOrder.count(false));
        //subAggregation 为子聚合
        termsAggregationBuilder.subAggregation(subAggregationBuilder);
        sourceBuilder
                .query(QueryBuilders.matchAllQuery())
                .aggregation(termsAggregationBuilder)
                .size(0);
        searchRequest.source(sourceBuilder);
        try {
            SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
            Aggregations aggregations = searchResponse.getAggregations();
            ParsedStringTerms titleGroup = aggregations.get("title_group");
            for (Terms.Bucket bucket : titleGroup.getBuckets()) {
                System.out.println(bucket.getKey()+"--"+bucket.getDocCount());
                Aggregations bucketAggregations = bucket.getAggregations();
                ParsedDoubleTerms priceSort = bucketAggregations.get("price_sort");
                for (Terms.Bucket priceSortBucket : priceSort.getBuckets()) {
                    System.out.println(priceSortBucket.getKey()+"--"+priceSortBucket.getDocCount());
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

有关【ES小结】还在用ElasticSearch做查询?换条思路实现高效数据统计的更多相关文章

  1. ruby - ECONNRESET (Whois::ConnectionError) - 尝试在 Ruby 中查询 Whois 时出错 - 2

    我正在用Ruby编写一个简单的程序来检查域列表是否被占用。基本上它循环遍历列表,并使用以下函数进行检查。require'rubygems'require'whois'defcheck_domain(domain)c=Whois::Client.newc.query("google.com").available?end程序不断出错(即使我在google.com中进行硬编码),并打印以下消息。鉴于该程序非常简单,我已经没有什么想法了-有什么建议吗?/Library/Ruby/Gems/1.8/gems/whois-2.0.2/lib/whois/server/adapters/base.

  2. ruby - 解析 RDFa、微数据等的最佳方式是什么,使用统一的模式/词汇(例如 schema.org)存储和显示信息 - 2

    我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i

  3. ruby - 如何根据特征实现 FactoryGirl 的条件行为 - 2

    我有一个用户工厂。我希望默认情况下确认用户。但是鉴于unconfirmed特征,我不希望它们被确认。虽然我有一个基于实现细节而不是抽象的工作实现,但我想知道如何正确地做到这一点。factory:userdoafter(:create)do|user,evaluator|#unwantedimplementationdetailshereunlessFactoryGirl.factories[:user].defined_traits.map(&:name).include?(:unconfirmed)user.confirm!endendtrait:unconfirmeddoenden

  4. ruby-on-rails - 在 Rails 和 ActiveRecord 中查询时忽略某些字段 - 2

    我知道我可以指定某些字段来使用pluck查询数据库。ids=Item.where('due_at但是我想知道,是否有一种方法可以指定我想避免从数据库查询的某些字段。某种反拔?posts=Post.where(published:true).do_not_lookup(:enormous_field) 最佳答案 Model#attribute_names应该返回列/属性数组。您可以排除其中一些并传递给pluck或select方法。像这样:posts=Post.where(published:true).select(Post.attr

  5. ruby - Ruby 有 `Pair` 数据类型吗? - 2

    有时我需要处理键/值数据。我不喜欢使用数组,因为它们在大小上没有限制(很容易不小心添加超过2个项目,而且您最终需要稍后验证大小)。此外,0和1的索引变成了魔数(MagicNumber),并且在传达含义方面做得很差(“当我说0时,我的意思是head...”)。散列也不合适,因为可能会不小心添加额外的条目。我写了下面的类来解决这个问题:classPairattr_accessor:head,:taildefinitialize(h,t)@head,@tail=h,tendend它工作得很好并且解决了问题,但我很想知道:Ruby标准库是否已经带有这样一个类? 最佳

  6. ruby - 我如何添加二进制数据来遏制 POST - 2

    我正在尝试使用Curbgem执行以下POST以解析云curl-XPOST\-H"X-Parse-Application-Id:PARSE_APP_ID"\-H"X-Parse-REST-API-Key:PARSE_API_KEY"\-H"Content-Type:image/jpeg"\--data-binary'@myPicture.jpg'\https://api.parse.com/1/files/pic.jpg用这个:curl=Curl::Easy.new("https://api.parse.com/1/files/lion.jpg")curl.multipart_form_

  7. 世界前沿3D开发引擎HOOPS全面讲解——集3D数据读取、3D图形渲染、3D数据发布于一体的全新3D应用开发工具 - 2

    无论您是想搭建桌面端、WEB端或者移动端APP应用,HOOPSPlatform组件都可以为您提供弹性的3D集成架构,同时,由工业领域3D技术专家组成的HOOPS技术团队也能为您提供技术支持服务。如果您的客户期望有一种在多个平台(桌面/WEB/APP,而且某些客户端是“瘦”客户端)快速、方便地将数据接入到3D应用系统的解决方案,并且当访问数据时,在各个平台上的性能和用户体验保持一致,HOOPSPlatform将帮助您完成。利用HOOPSPlatform,您可以开发在任何环境下的3D基础应用架构。HOOPSPlatform可以帮您打造3D创新型产品,HOOPSSDK包含的技术有:快速且准确的CAD

  8. 华为OD机试用Python实现 -【明明的随机数】 2023Q1A - 2

    华为OD机试题本篇题目:明明的随机数题目输入描述输出描述:示例1输入输出说明代码编写思路最近更新的博客华为od2023|什么是华为od,od薪资待遇,od机试题清单华为OD机试真题大全,用Python解华为机试题|机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为o

  9. FOHEART H1数据手套驱动Optitrack光学动捕双手运动(Unity3D) - 2

    本教程将在Unity3D中混合Optitrack与数据手套的数据流,在人体运动的基础上,添加双手手指部分的运动。双手手背的角度仍由Optitrack提供,数据手套提供双手手指的角度。 01  客户端软件分别安装MotiveBody与MotionVenus并校准人体与数据手套。MotiveBodyMotionVenus数据手套使用、校准流程参照:https://gitee.com/foheart_1/foheart-h1-data-summary.git02  数据转发打开MotiveBody软件的Streaming,开始向Unity3D广播数据;MotionVenus中设置->选项选择Unit

  10. 使用canal同步MySQL数据到ES - 2

    文章目录一、概述简介原理模块二、配置Mysql使用版本环境要求1.操作系统2.mysql要求三、配置canal-server离线下载在线下载上传解压修改配置单机配置集群配置分库分表配置1.修改全局配置2.实例配置垂直分库水平分库3.修改group-instance.xml4.启动监听四、配置canal-adapter1修改启动配置2配置映射文件3启动ES数据同步查询所有订阅同步数据同步开关启动4.验证五、配置canal-admin一、概述简介canal是Alibaba旗下的一款开源项目,Java开发。基于数据库增量日志解析,提供增量数据订阅&消费。Git地址:https://github.co

随机推荐