千万、亿级别数据批量写入ES的调优和原理解析
Elasticsearch version (bin/elasticsearch --version):
7.8
Plugins installed:
kibana
JVM version (java -version):
java version "1.8.0_102"
OS version (uname -a if on a Unix-like system):
Linux 4.9.0-4-amd64 #1 SMP Debian 4.9.65-3 (2017-12-03) x86_64 GNU/Linux
ES节点:3台,4C16G,JVM8G
ES 是 JAVA 应用——底层存储引擎是基于 Lucene 的
1.1、是JAVA应用,就离不开JVM和GC
内存从大的方面分为堆内内存和堆外内存
1.2、堆外内存概念
堆外一般指堆外内存,英文全称:off-heap memory
堆外内存=物理机内存
堆外内存指的是java虚拟机堆以外的内存,这个区域是受操作系统管理,而不是jvm。
1.3、堆内内存概念
堆内一般指堆内内存,英文全称:on-heap memory (heap:堆,java的内存区)
堆内内存 = 新生代+老年代+持久代
对于JVM,在jvm参数中可使用-Xms,-Xmx等参数就可以设置堆的大小和最大值
1.4、Elasticsearch内部是如何使用这些内存的呢?下面这张图说明了Elasticsearch和Lucene对内存的使用情况。

上图解析:Elasticsearch 限制的内存大小是 JAVA 堆空间的大小,不包括Lucene 缓存倒排索引数据空间。
Node Query Cache的默认大小:
indices.queries.cache.size:10% // 也可以设置为绝对值,比如512mb
index.queries.cache.enabled:true
Indexing Buffer的默认大小:
indices.memory.index_buffer_size:10%
indices.memory.min_index_buffer_size:48mb
indices.memory.max_index_buffer_size:unbounded
修改jvm heap大小:
vim /opt/elasticsearch/config/jvm.options
设置
-Xms10g
-Xms10g
接着停止es集群(kill -9 pid),再启动
su es
./bin/elasticsearch -d
查看是否生效
ps -ef | grep elasticsearch Shard Request Cache的默认大小:
indices.requests.cache.size:1% Field Data Cache的默认大小:
indices.fielddata.cache.size:unbounded 如果不在analyzed string fields上使用聚合,就不会产生Field Data Cache,也就不会使用大量的内存,所以可以考虑分配较小的heap给Elasticsearch。因为heap越小意味着Elasticsearch的GC会比较快,并且预留给Lucene的内存也会比较大。1.5、其他影响内存项
https://www.elastic.co/guide/en/elasticsearch/reference/6.7/modules-threadpool.html
GET /_cat/segments?v
GET /_cat/nodes?v&h=id,ip,port,v,master,name,heap.current,heap.percent,heap.max,
ram.current,ram.percent,ram.max,
fielddata.memory_size,fielddata.evictions,query_cache.memory_size,query_cache.evictions,
request_cache.memory_size,request_cache.evictions,request_cache.hit_count,request_cache.miss_count
indices.breaker.fielddata.limit:60% (默认heap的60%) 如果设置了indices.fielddata.cache.size,当达到size时,cache会剔除旧的fielddata。indices.breaker.fielddata.limit 必须大于 indices.fielddata.cache.size,否则只会触发fielddata circuit breaker,而不会剔除旧的fielddata。1)Elasticsearch默认安装后设置的内存是1GB,这是远远不够用于生产环境的。有两种方式修改Elasticsearch的堆内存:
export ES_HEAP_SIZE=10g 在es启动时会读取该变量;./bin/elasticsearch -Xmx10g -Xms10g2)分配给 es 的内存最好不要超过 32G
我有一个 1 TB 内存的机器!
这个 32 GB 的分割线是很重要的。那如果你的机器有很大的内存怎么办呢? 一台有着 512–768 GB内存的服务器愈发常见。
首先,我们建议避免使用这样的高配机器(参考 硬件)。
但是如果你已经有了这样的机器,你有三个可选项:
- 你主要做全文检索吗?考虑给 Elasticsearch 4 - 32 GB 的内存, 让 Lucene 通过操作系统文件缓存来利用余下的内存。那些内存都会用来缓存 segments,带来极速的全文检索。
- 你需要更多的排序和聚合?而且大部分的聚合计算是在数字、日期、地理点和
非分词字符串上?你很幸运,你的聚合计算将在内存友好的 doc values 上完成! 给 Elasticsearch 4 到 32 GB 的内存,其余部分为操作系统缓存内存中的 doc values。- 你在对分词字符串做大量的排序和聚合(例如,标签或者 SigTerms,等等)不幸的是,这意味着你需要 fielddata,意味着你需要堆空间。考虑在单个机器上运行两个或多个节点,而不是拥有大量 RAM 的一个节点。仍然要坚持 50% 原则。
假设你有个机器有 128 GB 的内存,你可以创建两个节点,每个节点内存分配不超过 32 GB。 也就是说不超过 64 GB 内存给 ES 的堆内存,剩下的超过 64 GB 的内存给 Lucene。
如果你选择这一种,你需要配置
cluster.routing.allocation.same_shard.host: true。 这会防止同一个分片(shard)的主副本存在同一个物理机上(因为如果存在一个机器上,副本的高可用性就没有了)。
参考2.x官方文档

ES写入/索引流程分析如上图,简单分析一下索引写入流程
1、client发起write请求
2、数据写到index buffer和translog中
3、经过1s或者达到10%堆内存阈值后将数据从buffer中的数据写入到segment file,refresh到os cache(FileSystem cache)中,打开供搜索,并清空buffer
4、当translog达到flush_threshold_size大小后,触发commit操作
4-1)将此时buffer中的数据写入到新的segment,并写入到os cache,打开被搜索,然后清空buffer
4-2)一个commit point被写入磁盘文件,标记了被写入的所有index segment file
4-3)同时将os cache中的所有segment file都fsync到DISK中,这个过程叫flush
4-4)清空现有translog日志文件,新建一个translog
文章参考:
配置项参考该类:elasticsearch-hadoop-6.8.21.jar
org.elasticsearch.hadoop.cfg#ConfigurationOptions
主要的配置项如下
1、连接参数配置
"es.http.timeout" -> "5m" ①
"es.http.retries" -> "50" ②
① ② 这两个参数是控制http接口层面的超时及重试,覆盖读请求和写请求,默认值比较小,默认超时时间为1分钟,重试次数为3,建议调整为超时时间5分钟,重试次数50次。
2、写入批次配置
"es.batch.size.bytes" -> "10mb" ①
"es.batch.size.entries" -> "20000" ②"es.batch.write.refresh" -> "false" ③
"es.batch.write.retry.count" -> "10" ④
"es.batch.write.retry.wait" -> "60s" ⑤
① ② 这两个参数控制单次批量写入的数据量大小和条数,数据积累量先达到哪个参数设置,都会触发一次批量写入。增大单次批量写入的数据,可以提高写入ES的整体吞吐。
因为ES的写入一般是顺序写入,在一次批量写入中,很多数据的写入处理逻辑可以合并,大量的IO操作也可以合并。
默认值设置的比较小,可以适当根据集群的规模调大这两个值,建议为20MB和2w条。
③ 是否每次bulk操作后都进行refresh。 每次refresh后,ES会将当前内存中的数据生成一个新的segment。如果refresh速度过快,会产生大量的小segment,大量segment在进行合并时,会消耗磁盘的IO。默认值为开启,如果写时查询要求没那么高,建议设置为false。在索引的settings中通过refresh_interval配置项进行控制,可以根据业务的需求设置为30s或-1,index_buffer默认是堆的10%,满了就会refresh
④ ⑤ 这两个参数会控制单次批量写入请求的重试次数,以及重试间隔。当超过重试次数后,Yarn任务管理会将该任务标记为failed,造成整个写数据任务的失败。默认值为3,为了防止集群偶发的网络抖动或压力过大造成的集群短暂熔断,建议将这个值调大,设置为50。
当每条数据比较均匀的时候,用es.batch.size.entries限制批量写入条数比较合适,但是当每条数据不均匀时,建议用es.batch.size.bytes限制每批次的写入数据量比较合适。当然,bulk size不能无限的增大,会造成写入任务的积压。
实际效果:spark调优过程比较便捷,基于上述配置,可以达到 “单机4C16G——10个线程——20w/s的写入速度”,5KW数据量-45min左右可写入完成
ElasticSearch的配置选项分为静态设置和动态设置两种,静态设置必须在结点级别(node-level)设置,或配置在elasticsearch.yml配置文件中,或配置在环境变量中,或配置在命令行中,在结点启动之后,静态设置不能修改;动态索引可以创建时添加或者创建后再添加或者修改
5.1、bulk批量写入
5.2、多线程写入
5.3、修改translog flush 间隔
index.translog.durability: async // 刷新策略,默认request
index.translog.sync_interval:120s // 默认5s,translog buffer到文件的刷新时间
index.translog.flush_threshold_size:1gb
/**
flush阈值,默认512MB;超过512M,触发flush操作
会将index buffer中的数据,refresh到os cache中
产生新的segment file,如果这个值太小,
就会频繁发生refresh、merge和flush
**/ 5.4、修改索引刷新时间及副本数
curl -XPUT "http://localhost:9200/myindex/_settings" -H 'Content-Type: application/json' -d'
{
"index.number_of_replicas": 0,
"index.refresh_interval": "120s"
}'
// org.elasticsearch.action.support.WriteRequest.RefreshPolicy#WriteRequestBuilder
default B setRefreshPolicy(RefreshPolicy refreshPolicy) {
request().setRefreshPolicy(refreshPolicy);
return (B) this;
} 枚举org.elasticsearch.action.support.WriteRequest.RefreshPolicy定义了三种策略:
NONE,
IMMEDIATE,
WAIT_UNTIL;
可知有以下三种刷新策略:
RefreshPolicy#IMMEDIATE:RefreshPolicy#WAIT_UNTIL:RefreshPolicy#NONE:5.5、修改merge参数以及线程数
cat /sys/block/*/queue/rotational
-- 0 SSD // 固态硬盘
-- 1 HDD // 机械硬盘 | 参数修改 | 好处 | 坏处 |
|---|---|---|
| index.merge.policy.max_merge_at_once index.merge.policy.segments_per_tier (eg :50) | 提升indexing速度 | 减少了segment merge动作的发生,意味着更多的segments,会降低searching速度 |
| index.merge.policy.max_merge_at_once index.merge.policy.segments_per_tier (eg :5) | Segments更少,即能够提升searching速度 | 更多的segments merge操作,会花费更多系统资源(CPU/IO/RAM),会降低indexing速度 |
curl -XPUT "https://localhost:9200/index/_settings?pretty" -H 'Content-Type: application/json' -d'
{
"index": {
"merge": {
"scheduler": {
"max_thread_count": "1"
},
"policy": {
"segments_per_tier": "24",
"max_merge_at_once": "24",
"floor_segment": "2m",
"max_merged_segment": "2g"
}
}
}
}'
5.6、禁用Doc Values
curl -XPUT "http://localhost:9200/myindex" -H 'Content-Type: application/json' -d'
{
"mappings": {
"my_type": {
"properties": {
"session_id": {
"type": "keyword",
"doc_values": false
}
}
}
}
}'
5.7、禁用_source字段
curl -XPUT 'https://ip:port/index?pretty' -H 'Content-Type: application/json' -d'
{
"mappings": {
"tweet": {
"_source": {
"enabled": false
}
}
}
}'
说明: 在禁用_source 字段之前请注意:如果_source字段不可用,则不支持以下功能:
5.8、自动生成DOC_ID
5.9、禁用 _source 和禁用 _all 字段
5.10、bulk api解决超时问题
BulkRequest request = new BulkRequest();
request.timeout("2m"); // 索引操作超时时间
// option-1
RestClientBuilder builder = RestClient.builder(
new HttpHost("localhost", 9200))
.setRequestConfigCallback(
new RestClientBuilder.RequestConfigCallback() {
@Override
public RequestConfig.Builder customizeRequestConfig(
RequestConfig.Builder requestConfigBuilder) {
return requestConfigBuilder
.setConnectTimeout(5000)
.setSocketTimeout(60000)
.setConnectionRequestTimeout(5000); // 获取连接的超时时间
}
});
// option-2
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(5000)
.setSocketTimeout(60000)
.setConnectionRequestTimeout(5000)
.build();
RequestOptions options = RequestOptions.DEFAULT.toBuilder()
.setRequestConfig(requestConfig)
.build();
RequestConfig有三个超时如下
kibana监控模块通过调用es索引存储的监控数据,制作了许多开箱即用报表供用户使用。主要分为集群层面、节点层面和索引层面
kibana通过es索引中存储的数据计算出了许多指标报表,如上图所示包含了查询(加载)速率和查询(加载)延时,除此之外还有cpu、内存、存储以及负载占用等等许多指标
参考文章
es实战-Monitoring原理讲解及kibana可视化实战_casterQ的博客-CSDN博客_kibana的monitoring
1、3台4C16G的ES集群
2、字段数100个,
3、字段长度50字节,
4、一条记录打满5KB,
5、数据量5000w,
6、运行资源标准1C4G(倍数增长),
7、并发度即程序线程数,8、UUID作为DOC_ID
调优过程比较多,截取有针对性提升的时间节点对应的调优项
| 调优项 | 批次/条 | 运行资源 | 并发度 | 同步效率 | 同步时间 |
|---|---|---|---|---|---|
| jvm heap 1G | 5000 | 2C8G | 4 | / | 211.08min |
| jvm heap 8G | 3000 | 4C16G | 8 | 50w/min | 159min |
| jvm heap 8G | 2000 | 4C16G | 8 | 50w/min | 154min |
| +refresh_interval:10s +max_thread_count:1 +number_replicas:0 | 2000 | 6C24G | 12 | 75w/min | 77min |
| +index_buffer_size:20% | 5000 | 6C24G | 12 | 76w/min | 72.73min |
| +refresh_interval:20s +max_merge_at_once:50 +segment_per_tier:50 | 5000 | 6C24G | 16 | 78w/min | 70min |
| +number_of_shards:3 +translog.durability:async +translog.sync_interval:60s +flush_threshold_size:1g | 5000 | 6C24G | 16 | 90w/min | 57.62min |
| 继承上述调优项 | 5000 | 2C8G | 4 | 60w/min | 92.77min |
| +refresh_interval:60s | 5000 | 2C8G | 4 | 62w/min | 86.87min |
| 继承上述调优项 | 5000 | 4C16G | 12 | 92w/min | 59.35min |
{
"settings": {
"index": {
"refresh_intervals": "60s",
"translog": {
"flush_threshold_size": "1g",
"sync_interval": "60s",
"durability": "async"
},
"unassigned": {
"node_left": {
"delayed_timeout": "3h"
}
},
"number_of_replicas": 0,
"merge": {
"scheduler": {
"max_thread_count": 1
},
"policy": {
"segment_per_tier": 50,
"max_merge_at_once": 50
}
}
}
}
}
1)方法比结论重要。一个系统性问题往往是多种因素造成的,在处理集群的写入性能问题上,了解原理后,需要将问题分解,在单台机子上进行压测,观察哪种系统资源达到极限;例如:CPU、磁盘利用率、I/O block、线程切换、堆栈状态;然后分析并调整参数,优化单台能力,先解决局部问题,在此基础上解决整体问题效率更高
2)可以使用更好的CPU,或者SSD,对写入性能提升明显
3)若能达到3w/s的写入性能,优化就差不多了
我有一个字符串input="maybe(thisis|thatwas)some((nice|ugly)(day|night)|(strange(weather|time)))"Ruby中解析该字符串的最佳方法是什么?我的意思是脚本应该能够像这样构建句子:maybethisissomeuglynightmaybethatwassomenicenightmaybethiswassomestrangetime等等,你明白了......我应该一个字符一个字符地读取字符串并构建一个带有堆栈的状态机来存储括号值以供以后计算,还是有更好的方法?也许为此目的准备了一个开箱即用的库?
我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i
我正在使用ruby1.9解析以下带有MacRoman字符的csv文件#encoding:ISO-8859-1#csv_parse.csvName,main-dialogue"Marceu","Giveittohimóhe,hiswife."我做了以下解析。require'csv'input_string=File.read("../csv_parse.rb").force_encoding("ISO-8859-1").encode("UTF-8")#=>"Name,main-dialogue\r\n\"Marceu\",\"Giveittohim\x97he,hiswife.\"\
好的,所以我的目标是轻松地将一些数据保存到磁盘以备后用。您如何简单地写入然后读取一个对象?所以如果我有一个简单的类classCattr_accessor:a,:bdefinitialize(a,b)@a,@b=a,bendend所以如果我从中非常快地制作一个objobj=C.new("foo","bar")#justgaveitsomerandomvalues然后我可以把它变成一个kindaidstring=obj.to_s#whichreturns""我终于可以将此字符串打印到文件或其他内容中。我的问题是,我该如何再次将这个id变回一个对象?我知道我可以自己挑选信息并制作一个接受该信
有时我需要处理键/值数据。我不喜欢使用数组,因为它们在大小上没有限制(很容易不小心添加超过2个项目,而且您最终需要稍后验证大小)。此外,0和1的索引变成了魔数(MagicNumber),并且在传达含义方面做得很差(“当我说0时,我的意思是head...”)。散列也不合适,因为可能会不小心添加额外的条目。我写了下面的类来解决这个问题:classPairattr_accessor:head,:taildefinitialize(h,t)@head,@tail=h,tendend它工作得很好并且解决了问题,但我很想知道:Ruby标准库是否已经带有这样一个类? 最佳
简而言之错误:NOTE:Gem::SourceIndex#add_specisdeprecated,useSpecification.add_spec.Itwillberemovedonorafter2011-11-01.Gem::SourceIndex#add_speccalledfrom/opt/local/lib/ruby/site_ruby/1.8/rubygems/source_index.rb:91./opt/local/lib/ruby/gems/1.8/gems/rails-2.3.8/lib/rails/gem_dependency.rb:275:in`==':und
我正在尝试使用Curbgem执行以下POST以解析云curl-XPOST\-H"X-Parse-Application-Id:PARSE_APP_ID"\-H"X-Parse-REST-API-Key:PARSE_API_KEY"\-H"Content-Type:image/jpeg"\--data-binary'@myPicture.jpg'\https://api.parse.com/1/files/pic.jpg用这个:curl=Curl::Easy.new("https://api.parse.com/1/files/lion.jpg")curl.multipart_form_
无论您是想搭建桌面端、WEB端或者移动端APP应用,HOOPSPlatform组件都可以为您提供弹性的3D集成架构,同时,由工业领域3D技术专家组成的HOOPS技术团队也能为您提供技术支持服务。如果您的客户期望有一种在多个平台(桌面/WEB/APP,而且某些客户端是“瘦”客户端)快速、方便地将数据接入到3D应用系统的解决方案,并且当访问数据时,在各个平台上的性能和用户体验保持一致,HOOPSPlatform将帮助您完成。利用HOOPSPlatform,您可以开发在任何环境下的3D基础应用架构。HOOPSPlatform可以帮您打造3D创新型产品,HOOPSSDK包含的技术有:快速且准确的CAD
本教程将在Unity3D中混合Optitrack与数据手套的数据流,在人体运动的基础上,添加双手手指部分的运动。双手手背的角度仍由Optitrack提供,数据手套提供双手手指的角度。 01 客户端软件分别安装MotiveBody与MotionVenus并校准人体与数据手套。MotiveBodyMotionVenus数据手套使用、校准流程参照:https://gitee.com/foheart_1/foheart-h1-data-summary.git02 数据转发打开MotiveBody软件的Streaming,开始向Unity3D广播数据;MotionVenus中设置->选项选择Unit
文章目录一、概述简介原理模块二、配置Mysql使用版本环境要求1.操作系统2.mysql要求三、配置canal-server离线下载在线下载上传解压修改配置单机配置集群配置分库分表配置1.修改全局配置2.实例配置垂直分库水平分库3.修改group-instance.xml4.启动监听四、配置canal-adapter1修改启动配置2配置映射文件3启动ES数据同步查询所有订阅同步数据同步开关启动4.验证五、配置canal-admin一、概述简介canal是Alibaba旗下的一款开源项目,Java开发。基于数据库增量日志解析,提供增量数据订阅&消费。Git地址:https://github.co