分组统计,自动补全,数据同步
桶(bucket):
桶的作用,是按照某种方式对数据进行分组,每一组数据在ES中称为一个桶,ES中提供的划分桶的方式有很多:
日期阶梯分组,例如给定阶梯为周,会自动每周分为一组数值阶梯分组,与日期类似,需要知道分组的间隔(interval)词条内容分组,词条内容完全匹配的为一组数值和日期的范围分组,指定开始和结束,然后按段分组度量(metrics):
分组完成以后,我们一般会对组中的数据进行聚合运算,例如求平均值、最大、最小、求和等,这些在ES中称为度量
常见度量聚合方式:
统计价格在500元之内酒店品牌有几种,此时可以根据酒店品牌的名称做聚合。
GET /hotel/_search
{
"query":{ //搜索条件
"range": {
"price":{
"lte": 500
}
}
},
"size": 0, //不查询具体的数据
"aggs": { //声明这是一个聚合查询,是aggregations的缩写
"brandAgg": { //给这次聚合起一个名字,可任意指定
"terms": { //聚合的类型,这里选择terms,是根据词条内容(这里是品牌)划分
"field": "brand", //按照哪个字段分组
"size": 10, //显示多少条聚合结果
"order": {
"_count": "asc" //分组之后可以根据数量排序
}
}
}
}
}
结果:
{
"took" : 4,
"timed_out" : false,
"_shards" : {
//省略
},
"hits" : {
"total" : {
"value" : 95,
"relation" : "eq"
},
"max_score" : null,
"hits" : [ ] //不显示搜索的数据结果
},
"aggregations" : {
"brandAgg" : { //桶的名称
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 78,
"buckets" : [
{
"key" : "万丽",
"doc_count" : 1
},
{
"key" : "万怡",
"doc_count" : 1
}
//...省略
]
}
}
}
前面的例子告诉我们每个桶里面的文档数量,这很有用。 但通常,我们的应用需要提供更复杂的文档度量。 例如,每种品牌酒店的平均价格是多少?
因此,我们需要告诉ES使用哪个字段,使用何种度量方式进行运算,这些信息要嵌套在桶内,度量的运算会基于桶内的文档进行
GET /hotel/_search
{
"query":{
"match_all": {}
},
"size": 0, //不查询具体的数据
"aggs": {
"brandAgg": {
"terms": {
"field": "brand", //按照哪个字段分组
"order": {
"scoreAgg.avg": "desc" //根据指定的统计项排序
}
},
"aggs":{ //是brands聚合的子聚合,也就是分组后对每组分别计算
"scoreAgg": {//聚合名称
"stats": {//聚合类型,这里stats可以计算min、max、avg等
"field":"score"//聚合字段,这里是score
}
}
}
}
}
}
结果:
{
"took" : 6,
"timed_out" : false,
"_shards" : { },
"hits" : { },
"aggregations" : {
"brandAgg" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 111,
"buckets" : [
{
"key" : "万丽",
"doc_count" : 2,
"scoreAgg" : {
"count" : 2,
"min" : 46.0,
"max" : 47.0,
"avg" : 46.5,
"sum" : 93.0
}
},
{
"key" : "凯悦",
"doc_count" : 8,
"scoreAgg" : {
"count" : 8,
"min" : 45.0,
"max" : 47.0,
"avg" : 46.25,
"sum" : 370.0
}
},
//省略
}
]
}
}
}

构建测试类AggsTest
//聚合为桶
@Test
public void aggs() throws IOException {
//1. 创建请求对象
SearchRequest request = new SearchRequest("hotel");
//2. 设置条件
request.source().query(QueryBuilders.rangeQuery("price").lte(500));//设置查询条件
request.source().size(0);//不显示查询结果
request.source().aggregation(AggregationBuilders.terms("brandAgg").field("brand")
.size(10).order(BucketOrder.count(true)));//集合为桶
//3. 发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//4. 处理结果
//4-1获取聚合结果
Terms brandTerm = response.getAggregations().get("brandAgg");
//4-2 获取结果中的桶
for (Terms.Bucket bucket : brandTerm.getBuckets()) {
System.out.println(bucket.getKey() + ":" + bucket.getDocCount());
}
}

//聚合为桶, 桶内度量
@Test
public void aggs2() throws IOException {
//1. 创建请求对象
SearchRequest request = new SearchRequest("hotel");
//2. 设置条件
request.source().query(QueryBuilders.rangeQuery("price").lte(500));//设置查询条件
request.source().size(0);//不显示查询结果
request.source().aggregation(
AggregationBuilders.terms("brandAgg").field("brand").size(10).order(BucketOrder.count(true))//集合为桶
.subAggregation(AggregationBuilders.stats("scoreAgg").field("score"))//按照评分统计
);
//3. 发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//4. 处理结果
//4-1获取聚合结果
Terms brandTerm = response.getAggregations().get("brandAgg");
//4-2 获取结果中的桶
for (Terms.Bucket bucket : brandTerm.getBuckets()) {
ParsedStats stats = (ParsedStats) bucket.getAggregations().getAsMap().get("scoreAgg");//获取统计结果
System.out.println(bucket.getKey() + ":" + bucket.getDocCount()+":"+stats.getAvg());
}
}

结果是一个Map结构:
Controller //统计
@PostMapping("/hotel/filters")
public Map<String, List<String>> getFilters(@RequestBody RequestParams requestParams) throws IOException {
return hotelService.filters(requestParams);
}
ServiceImpl
//统计结果
@Override
public Map<String, List<String>> filters(RequestParams requestParams) throws IOException {
//1. 构建查询请求
SearchRequest request = new SearchRequest("hotel");
//2. 设置查询条件
//2-1 创建复合查询
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
//2-2 获取搜索关键字, 设置为查询条件
String key = requestParams.getKey();
if (StringUtils.isEmpty(key)) {
boolQuery.must(QueryBuilders.matchAllQuery());
} else {
boolQuery.must(QueryBuilders.matchQuery("all", key));
}
//2-3 获取城市、星级、品牌、价格,使用过滤语法筛选
// 城市
if (StrUtil.isNotEmpty(requestParams.getCity())) {
boolQuery.filter(QueryBuilders.termQuery("city", requestParams.getCity()));
}
// 星级
if (StrUtil.isNotEmpty(requestParams.getStarName())) {
boolQuery.filter(QueryBuilders.termQuery("starName", requestParams.getStarName()));
}
// 品牌
if (StrUtil.isNotEmpty(requestParams.getBrand())) {
boolQuery.filter(QueryBuilders.termQuery("brand", requestParams.getBrand()));
}
// 价格
if (requestParams.getMinPrice() != null && requestParams.getMaxPrice() != null) {
boolQuery.filter(QueryBuilders.rangeQuery("price").gte(requestParams.getMinPrice()).lte(requestParams.getMaxPrice()));
}
request.source().query(boolQuery);//设置查询条件
request.source().size(0);//不查询信息
//3. 设置统计分析
request.source().aggregation(AggregationBuilders.terms("brandAgg").field("brand").size(50));
request.source().aggregation(AggregationBuilders.terms("cityAgg").field("city").size(50));
request.source().aggregation(AggregationBuilders.terms("starNameAgg").field("starName").size(50));
//4. 发起请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//5. 处理统计结果
Aggregations aggregations = response.getAggregations();
List<String> brandAgg = ((Terms) response.getAggregations().get("brandAgg")).getBuckets().stream().map(e -> e.getKeyAsString()).collect(Collectors.toList());
List<String> cityAgg = ((Terms) response.getAggregations().get("cityAgg")).getBuckets().stream().map(e -> e.getKeyAsString()).collect(Collectors.toList());
List<String> starNameAgg = ((Terms) response.getAggregations().get("starNameAgg")).getBuckets().stream().map(e -> e.getKeyAsString()).collect(Collectors.toList());
Map<String, List<String>> map = new HashMap<>();
map.put("brand",brandAgg);
map.put("city",cityAgg);
map.put("starName",starNameAgg);
return map;
}
① 使用docker容器安装mq
docker run \
-v mq-plugins:/plugins \
--name mq \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3.8-management
② 登录管理界面
浏览器访问:http://192.168.149.128:15672/
账号: guest
密码: guest
③ 配置虚拟主机、用户
例如:
账号: hotel
密码: hotel
主机: /hotel


① 引入依赖
<!--amqp-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
② 配置文件
spring:
rabbitmq:
host: 192.168.149.128
port: 5672
virtual-host: /hotel
username: hotel
password: hotel
③ 声明队列交换机名称
public class MqConstants {
/**
* 交换机
*/
public final static String HOTEL_EXCHANGE = "hotel.topic";
/**
* 监听新增和修改的队列
*/
public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
/**
* 监听删除的队列
*/
public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
/**
* 新增或修改的RoutingKey
*/
public final static String HOTEL_INSERT_KEY = "hotel.insert";
/**
* 删除的RoutingKey
*/
public final static String HOTEL_DELETE_KEY = "hotel.delete";
}
④ 创建交换机和队列
@Configuration
public class MqConfiguration {
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(MqConstants.HOTEL_EXCHANGE);
}
// 声明第1个队列
@Bean
public Queue insertQueue() {
return new Queue(MqConstants.HOTEL_INSERT_QUEUE);
}
// 队列1绑定交换机
@Bean
public Binding bindingInsertQueue(TopicExchange topicExchange, Queue insertQueue) {
return BindingBuilder.bind(insertQueue).to(topicExchange).with(MqConstants.HOTEL_INSERT_KEY);
}
// 声明第2个队列
@Bean
public Queue deleteQueue() {
return new Queue(MqConstants.HOTEL_DELETE_QUEUE);
}
// 队列2绑定交换机
@Bean
public Binding bindingDeleteQueue(TopicExchange topicExchange, Queue deleteQueue) {
return BindingBuilder.bind(deleteQueue).to(topicExchange).with(MqConstants.HOTEL_DELETE_KEY);
}
}

@Component
public class HotelListener {
@Autowired
private RestHighLevelClient client;
@Autowired
private HotelMapper hotelMapper;
// 新增、修改
@RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
public void insertOrUpdateHotel(Long id) throws IOException {
// 1.创建request
IndexRequest request = new IndexRequest("hotel").id(id.toString());
// 2.准备DSL
Hotel hotel = hotelMapper.selectById(id);
HotelDoc hotelDoc = new HotelDoc(hotel);
request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
// 3.发送请求
client.index(request, RequestOptions.DEFAULT);
System.out.println("es同步新增或更新:" + id);
}
// 删除
@RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
public void deleteHotel(Long id) throws IOException {
// 1.创建request
DeleteRequest request = new DeleteRequest("hotel", id.toString());
// 2.发送请求
client.delete(request, RequestOptions.DEFAULT);
System.out.println("es同步删除:" + id);
}
}
es运行需要修改一些linux系统权限,修改/etc/sysctl.conf文件
vim /etc/sysctl.conf
添加下面的内容:
vm.max_map_count=262144
然后执行命令,让配置生效:
sysctl -p
创建集群
编写一个docker-compose.yml文件,内容如下:
version: '2.2'
services:
es01:
image: elasticsearch:7.12.1
container_name: es01
environment:
- node.name=es01
- cluster.name=es-docker-cluster
- discovery.seed_hosts=es02,es03
- cluster.initial_master_nodes=es01,es02,es03
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
volumes:
- data01:/usr/share/elasticsearch/data
ports:
- 9201:9200
networks:
- elastic
es02:
image: elasticsearch:7.12.1
container_name: es02
environment:
- node.name=es02
- cluster.name=es-docker-cluster
- discovery.seed_hosts=es01,es03
- cluster.initial_master_nodes=es01,es02,es03
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
volumes:
- data02:/usr/share/elasticsearch/data
ports:
- 9202:9200
networks:
- elastic
es03:
image: elasticsearch:7.12.1
container_name: es03
environment:
- node.name=es03
- cluster.name=es-docker-cluster
- discovery.seed_hosts=es01,es02
- cluster.initial_master_nodes=es01,es02,es03
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
volumes:
- data03:/usr/share/elasticsearch/data
networks:
- elastic
ports:
- 9203:9200
volumes:
data01:
driver: local
data02:
driver: local
data03:
driver: local
networks:
elastic:
driver: bridge
在docker-compose.yml目录下执行下面命令,运行集群
docker-compose up -d

创建索引库
使用head插件创建索引库,分片设置为3,每个分片设置1个副本
查看分片效果
回到首页,即可查看索引库分片效果:

很好奇,就使用rubyonrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提
在控制台中反复尝试之后,我想到了这种方法,可以按发生日期对类似activerecord的(Mongoid)对象进行分组。我不确定这是完成此任务的最佳方法,但它确实有效。有没有人有更好的建议,或者这是一个很好的方法?#eventsisanarrayofactiverecord-likeobjectsthatincludeatimeattributeevents.map{|event|#converteventsarrayintoanarrayofhasheswiththedayofthemonthandtheevent{:number=>event.time.day,:event=>ev
我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i
我收到这个错误:RuntimeError(自动加载常量Apps时检测到循环依赖当我使用多线程时。下面是我的代码。为什么会这样?我尝试多线程的原因是因为我正在编写一个HTML抓取应用程序。对Nokogiri::HTML(open())的调用是一个同步阻塞调用,需要1秒才能返回,我有100,000多个页面要访问,所以我试图运行多个线程来解决这个问题。有更好的方法吗?classToolsController0)app.website=array.join(',')putsapp.websiteelseapp.website="NONE"endapp.saveapps=Apps.order("
有时我需要处理键/值数据。我不喜欢使用数组,因为它们在大小上没有限制(很容易不小心添加超过2个项目,而且您最终需要稍后验证大小)。此外,0和1的索引变成了魔数(MagicNumber),并且在传达含义方面做得很差(“当我说0时,我的意思是head...”)。散列也不合适,因为可能会不小心添加额外的条目。我写了下面的类来解决这个问题:classPairattr_accessor:head,:taildefinitialize(h,t)@head,@tail=h,tendend它工作得很好并且解决了问题,但我很想知道:Ruby标准库是否已经带有这样一个类? 最佳
我正在尝试使用Curbgem执行以下POST以解析云curl-XPOST\-H"X-Parse-Application-Id:PARSE_APP_ID"\-H"X-Parse-REST-API-Key:PARSE_API_KEY"\-H"Content-Type:image/jpeg"\--data-binary'@myPicture.jpg'\https://api.parse.com/1/files/pic.jpg用这个:curl=Curl::Easy.new("https://api.parse.com/1/files/lion.jpg")curl.multipart_form_
无论您是想搭建桌面端、WEB端或者移动端APP应用,HOOPSPlatform组件都可以为您提供弹性的3D集成架构,同时,由工业领域3D技术专家组成的HOOPS技术团队也能为您提供技术支持服务。如果您的客户期望有一种在多个平台(桌面/WEB/APP,而且某些客户端是“瘦”客户端)快速、方便地将数据接入到3D应用系统的解决方案,并且当访问数据时,在各个平台上的性能和用户体验保持一致,HOOPSPlatform将帮助您完成。利用HOOPSPlatform,您可以开发在任何环境下的3D基础应用架构。HOOPSPlatform可以帮您打造3D创新型产品,HOOPSSDK包含的技术有:快速且准确的CAD
本教程将在Unity3D中混合Optitrack与数据手套的数据流,在人体运动的基础上,添加双手手指部分的运动。双手手背的角度仍由Optitrack提供,数据手套提供双手手指的角度。 01 客户端软件分别安装MotiveBody与MotionVenus并校准人体与数据手套。MotiveBodyMotionVenus数据手套使用、校准流程参照:https://gitee.com/foheart_1/foheart-h1-data-summary.git02 数据转发打开MotiveBody软件的Streaming,开始向Unity3D广播数据;MotionVenus中设置->选项选择Unit
文章目录一、概述简介原理模块二、配置Mysql使用版本环境要求1.操作系统2.mysql要求三、配置canal-server离线下载在线下载上传解压修改配置单机配置集群配置分库分表配置1.修改全局配置2.实例配置垂直分库水平分库3.修改group-instance.xml4.启动监听四、配置canal-adapter1修改启动配置2配置映射文件3启动ES数据同步查询所有订阅同步数据同步开关启动4.验证五、配置canal-admin一、概述简介canal是Alibaba旗下的一款开源项目,Java开发。基于数据库增量日志解析,提供增量数据订阅&消费。Git地址:https://github.co
我们目前正在为ROR3.2开发自定义cms引擎。在这个过程中,我们希望成为我们的rails应用程序中的一等公民的几个类类型起源,这意味着它们应该驻留在应用程序的app文件夹下,它是插件。目前我们有以下类型:数据源数据类型查看我在app文件夹下创建了多个目录来保存这些:应用/数据源应用/数据类型应用/View更多类型将随之而来,我有点担心应用程序文件夹被这么多目录污染。因此,我想将它们移动到一个子目录/模块中,该子目录/模块包含cms定义的所有类型。所有类都应位于MyCms命名空间内,目录布局应如下所示:应用程序/my_cms/data_source应用程序/my_cms/data_ty