2020年团队决定对elasticsearch升级。es(elasticsearch缩写,下同)当前版本为0.9x,升级到5.x版本。es在本公司承载三个部分的业务,站内查询,订单数据统计,elk日志分析。
对于站内查询和订单数据统计,当前业务架构是
mysql -> canal -> kafka -> es
(可以考虑使用kafka connector 代替canal)
难点是在升级的时候如何不影响当前业务。
下载5.x版本的es,在新的机器上部署新的集群。
由于从0.9x到5.x版本跨度比较大,许多java api都发生了变化,需要修复。
一个坑是alias api 发生了语义变化,在后来的自测中修复了此问题。
我们使用索引重建程序来新建索引。重建索引具体步骤如下,我们称线上索引为online index, 新创建的索引为new index。
1.init
刷新索引名映射关系,检查当前alias只有一个物理索引。
根据预定义的mapping,创建索引new index。
设置在线索引记录数据变更日志,即记录线上索引消费kafka数据,并存储为change log文件.
2.全量索引数据库上的数据到new index
从mysql查出数据同步到es中,如果有多个分表,就按照表顺序同步。可以开启多线程批量插入。
3.对new index索引优化
refresh, flush 索引。调用force-merge api,进行段合并。
4.重放change log到new index中
根据change log 转换为es query,写入到new index。
5.暂停线上索引的写入
因为online index和new index 使用的是相同的kafka consumer group,所以必须停掉online index的消费功能。
6.关闭change log
停止记录在线索引记录数据变更日志。
7.第二阶段重放change log
根据change log 转换为es query,写入到new index。
8.删除change log
删除线索引记录数据变更日志。
9.设置副本数
new index创建索引的时候默认副本数为0,现在动态调整副本数为业务需要的值。比如对现实搜索业务设置两个副本,对订单统计类索引不需要副本。
PUT /new_index/_settings
{
"number_of_replicas": 2
}
此阶段可能会比较耗时,需要等待几分钟才能进行下一步操作。更好的做法是调用health api 查看分片状态。
GET _cluster/health
{
"cluster_name" : "testcluster",
"status" : "yellow",
"timed_out" : false,
"number_of_nodes" : 1,
"number_of_data_nodes" : 1,
"active_primary_shards" : 1,
"active_shards" : 1,
"relocating_shards" : 0, // 重新定位的分片
"initializing_shards" : 0, // 初始化中的分片
"unassigned_shards" : 1, // 未分配的分片
"delayed_unassigned_shards": 0,
"number_of_pending_tasks" : 0,
"number_of_in_flight_fetch": 0,
"task_max_waiting_in_queue_millis": 0,
"active_shards_percent_as_number": 50.0
}
10.别名切换
POST /_aliases
{
"actions": [
{ "remove": { "index": "online_index", "alias": "my_index" }},
{ "add": { "index": "new_index", "alias": "my_index" }}
]
}
11.运行在线索引 (从kafka里面读取数据)
new_index 开始从kafka里面消费最新数据。由于之前的操作可能会有延时,需要等待几分钟才能同步到最新数据。
12.删除旧的索引
删除old_index
详细代码步骤如下
// 1.init
logger.info("初始化");
ESHighLevelFactory esHighLevelFactory = ESHighLevelFactory.getInstance(indexContext.getIndex().getIndexName());
logger.info("刷新索引名映射关系");
if (!indexContext.refreshIndexName()) {
throw new IndexException("刷新索引映射关系失败");
}
rebuildIndexName = indexContext.getPhysicalRebuildIndexName();
logger.info("初始化重建索引环境,当前重建索引名:" + rebuildIndexName);
logger.info("创建索引,索引名:" + rebuildIndexName);
boolean isCreate = false;
try {
isCreate = indexContext.getIndex().createIndex(rebuildIndexName);
} catch (Throwable t) {
logger.info("创建索引失败,本次失败可以不处理,将会自动重试 ...");
}
logger.info("设置在线索引记录数据变更日志");
indexContext.startChangeLog();
// 2. 重建索引
logger.info("全量索引数据库上的数据 ...");
long startRebulidTime = System.currentTimeMillis();
rebuild();
logger.info(" ------ 完成全量索引数据库上的数据,对应索引" + rebuildIndexName + ",耗时" + ((System.currentTimeMillis() - startRebulidTime) / 1000)
+ " 秒 ------ ");
// 3. 索引优化 -- 是否调到变更重放完毕后做优化
logger.info("优化索引 ...");
long startOptimizeTime = System.currentTimeMillis();
ESHighLevelFactory.getInstance(rebuildIndexName).optimize(rebuildIndexName, 1);
logger.info(" ------ 完成" + rebuildIndexName + "索引优化,耗时 " + ((System.currentTimeMillis() - startOptimizeTime) / 1000)
+ " 秒 ------ ");
// TODO 字符集设置
BufferedReader logReader = new BufferedReader(new FileReader(indexContext.getChangeLogFilePath()));
// 4. 重放变更日志
logger.info("重放本地数据变更日志[第一阶段] ...");
long startReplay1Time = System.currentTimeMillis();
int replayChangeLogCount = replayChangeLogFirst(logReader);
logger.info(" ------ 完成[第一阶段]的变更日志重放,行数" + replayChangeLogCount + " 耗时 "
+ ((System.currentTimeMillis() - startReplay1Time) / 1000) + " 秒 ------ ");
// 5. 暂停在线索引
logger.info("暂停在线索引");
indexContext.pauseOnlineIndex();
isPauseOnline.set(true);
// 6. 设置 在线索引只做索引更新 以及 关闭 change log
logger.info("停止变更日志");
indexContext.stopChangeLog();
// 7. 继续重放 change log
logger.info("重放本地数据变更日志[第二阶段] ...");
long startReplay2Time = System.currentTimeMillis();
replayChangeLogCount = replayChangeLogCount + replayChangeLogSecond(logReader);
if ((indexContext.getWriteChangeLogCount() - replayChangeLogCount) != 0) {
logger.error("变更日志,处于错误的状态,统计的日志行数:" + indexContext.getWriteChangeLogCount() + ", 但实际只有:" + replayChangeLogCount);
}
logger.info(" ------ 完成[第二阶段]的变更日志重放,行数" + replayChangeLogCount + " 耗时 "
+ ((System.currentTimeMillis() - startReplay2Time) / 1000) + " 秒 ------ ");
// 8. 删除变更日志, OnlineIndex.startChangeLog 有做环境清理,这里不执行
logger.info("简单优化索引 ...");
long startSimpleOptimizeTime = System.currentTimeMillis();
ESHighLevelFactory.getInstance(rebuildIndexName).optimize(rebuildIndexName, null);
logger.info(" ------ 完成" + rebuildIndexName + "索引简单优化,耗时 " + ((System.currentTimeMillis() - startSimpleOptimizeTime) / 1000)
+ " 秒 ------ ");
// 9. 设置副本数 (怀疑比较耗时~~~待确认)
logger.info("设置副本数 ...");
int replicas = 3;
if (rebuildIndexName.startsWith(IndexNameConst.ORDER_INDEX_PREFIX)) {
replicas = 1;
} else if (rebuildIndexName.startsWith(IndexNameConst.IndexName.activityTicket.getIndexName())) {
replicas = 2;
} else {
String replicasStr = Configuration.getInstance().loadDiamondProperty(Configuration.ES_INDEX_REPLICAS);
if (NumberUtils.isNumber(replicasStr)) {
replicas = NumberUtils.toInt(replicasStr);
}
}
ESHighLevelFactory.getInstance(rebuildIndexName).setReplicas(rebuildIndexName, replicas);
// 执行索引切换流程
// 预发、线上环境阻塞等待2分钟同步数据后,再执行索引切换和删除旧索引逻辑
try {
if(IDCUtil.isBuildOrProduction()){
Thread.sleep(120 * 1000);
}
} catch (InterruptedException e) {
}
// 10. 别名切换
logger.info("索引切换:将" + rebuildIndexName + "设置为线上索引");
if (!indexContext.switchIndex(rebuildIndexName)) {
throw new IndexException("索引切换失败:将" + rebuildIndexName + "设置为线上索引失败");
}
// 11. 运行在线索引
logger.info("运行在线索引");
indexContext.keepRuningOnlineIndex();
isPauseOnline.set(false);
// 12. 删除原有在线索引
String oldOnlineIndexName = indexContext.getPhysicalRebuildIndexName();
logger.info("删除原有在线索引,索引名:" + oldOnlineIndexName);
if (!ESHighLevelFactory.getInstance(indexContext.getIndex().getIndexName()).deleteIndex(oldOnlineIndexName)) {
throw new IndexException("删除索引失败,索引名:" + oldOnlineIndexName);
}
思考
如果只是简单地新建索引,完全可以这样做(使用不同的消费组)
1.记录时间戳
2.全量索引数据的数据
3.根据前面的时间戳找到kafka中的下标,下标得时间戳必须 < 记录的时间戳
4.根据上一步的下标开始索引数据
部署新的客户端服务调用新的es集群,检查业务是否正常。对站内查询检查搜索结果是否一致,对统计类查询查看统计结果是否一致。
上线,观察业务是否稳定。
释放旧的es集群的资源。
es升级这份工作是两年之前做的,现在来进行总结,部分细节可能会有疏漏。但是总结起来,依然后很多收获,从架构,代码细节上都有改进的空间。es重建代码可以做得更通用,然后开源出来。
尝试通过RVM将RubyGems升级到版本1.8.10并出现此错误:$rvmrubygemslatestRemovingoldRubygemsfiles...Installingrubygems-1.8.10forruby-1.9.2-p180...ERROR:Errorrunning'GEM_PATH="/Users/foo/.rvm/gems/ruby-1.9.2-p180:/Users/foo/.rvm/gems/ruby-1.9.2-p180@global:/Users/foo/.rvm/gems/ruby-1.9.2-p180:/Users/foo/.rvm/gems/rub
我在我的Rails项目中使用Pow和powifygem。现在我尝试升级我的ruby版本(从1.9.3到2.0.0,我使用RVM)当我切换ruby版本、安装所有gem依赖项时,我通过运行railss并访问localhost:3000确保该应用程序正常运行以前,我通过使用pow访问http://my_app.dev来浏览我的应用程序。升级后,由于错误Bundler::RubyVersionMismatch:YourRubyversionis1.9.3,butyourGemfilespecified2.0.0,此url不起作用我尝试过的:重新创建pow应用程序重启pow服务器更新战俘
我实际上是在尝试使用RVM在我的OSX10.7.5上更新ruby,并在输入以下命令后:rvminstallruby我得到了以下回复:Searchingforbinaryrubies,thismighttakesometime.Checkingrequirementsforosx.Installingrequirementsforosx.Updatingsystem.......Errorrunning'requirements_osx_brew_update_systemruby-2.0.0-p247',pleaseread/Users/username/.rvm/log/138121
我最近决定从我的系统中卸载RVM。在thispage提出的一些论点说服我:实际上,我的决定是,我根本不想担心Ruby的多个版本。我只想使用1.9.2-p290版本而不用担心其他任何事情。但是,当我在我的Mac上运行ruby--version时,它告诉我我的版本是1.8.7。我四处寻找如何简单地从我的Mac上卸载这个Ruby,但奇怪的是我没有找到任何东西。似乎唯一想卸载Ruby的人运行linux,而使用Mac的每个人都推荐RVM。如何从我的Mac上卸载Ruby1.8.7?我想升级到1.9.2-p290版本,并且我希望我的系统上只有一个版本。 最佳答案
我发现自己需要这个。假设cart是一个包含用户列表的模型。defindex_of_itemcart.users.each_with_indexdo|u,i|ifu==current_userreturniendend获取此类关联索引的更简单方法是什么? 最佳答案 indexArray上的方法与您的index_of_item方法相同,例如cart.users.index(current_user)返回数组中第一个对象的索引==给obj。如果未找到匹配项,则返回nil。 关于ruby-on-
我完全不是程序员,正在学习使用Ruby和Rails框架进行编程。我目前正在使用Ruby1.8.7和Rails3.0.3,但我想知道我是否应该升级到Ruby1.9,因为我真的没有任何升级的“遗留”成本。缺点是什么?我是否会遇到与普通gem的兼容性问题,或者甚至其他我不太了解甚至无法预料的问题? 最佳答案 你应该升级。不要坚持从1.8.7开始。如果您发现不支持1.9.2的gem,请避免使用它们(因为它们很可能不被维护)。如果您对gem是否兼容1.9.2有任何疑问,您可以在以下位置查看:http://www.railsplugins.or
因此,当我遵循MichaelHartl的RubyonRails教程时,我注意到在用户表中,我们为:email属性添加了一个唯一索引,以提高find的效率方法,因此它不会逐行搜索。到目前为止,我们一直在根据情况使用find_by_email和find_by_id进行搜索。然而,我们从未为:id属性设置索引。:id是否自动索引,因为它在默认情况下是唯一的并且本质上是顺序的?或者情况并非如此,我应该为:id搜索添加索引吗? 最佳答案 大多数数据库(包括sqlite,这是RoR中的默认数据库)会自动索引主键,对于RailsMigration
假设我有一个可枚举对象enum,现在我想获取第三个项目。我知道一种通用方法是转换成数组,然后使用索引访问,如:enum.to_a[2]但这种方式会创建一个临时数组,效率可能很低。现在我使用:enum.each_with_index{|v,i|breakvifi==2}但这非常丑陋和多余。执行此操作最有效的方法是什么? 最佳答案 你可以使用take剥离前三个元素,然后剥离last从take给你的数组中获取第三个元素:third=enum.take(3).last如果您根本不想生成任何数组,那么也许:#Ifenumisn'tanEnum
我们有一个目前在Rails2.3.12版和Ruby1.8.7版上运行的应用程序。我们想将我们的应用程序更新到Rails4.0和Ruby2.1.0。我们有大约200个模型和150个Controller。我想知道升级过程需要多大的努力。您还可以提供升级可以遵循的步骤。我们应该先升级Ruby然后再升级Rails还是相反? 最佳答案 您想要实现的目标将是史诗般的努力。我无法为您提供分步说明,因为不可能在一个答案中涵盖所有情况。我建议不要同时升级Ruby和Rails,而是分步升级。升级本身的复杂性是巨大的,但只要您的应用程序具有合理的测试覆盖
在我的场景中,Logstash收到的系统日志行的“时间戳”是UTC,我们在Elasticsearch输出中使用事件“时间戳”:output{elasticsearch{embedded=>falsehost=>localhostport=>9200protocol=>httpcluster=>'elasticsearch'index=>"syslog-%{+YYYY.MM.dd}"}}我的问题是,在UTC午夜,Logstash在外时区(GMT-4=>America/Montreal)结束前将日志发送到不同的索引,并且索引在20小时(晚上8点)之后没有日志,因为“时间戳”是UTC。我们已