作者:谢泽华
众所周知单个机房在出现不可抗拒的问题(如断电、断网等因素)时,会导致无法正常提供服务,会对业务造成潜在的损失。所以在协同办公领域,一种可以基于同城或异地多活机制的高可用设计,在保障数据一致性的同时,能够最大程度降低由于机房的仅单点可用所导致的潜在高可用问题,最大程度上保障业务的用户体验,降低单点问题对业务造成的潜在损失显得尤为重要。
同城双活,对于生产的高可用保障,重大的意义和价值是不可言喻的。表面上同城双活只是简单的部署了一套生产环境而已,但是在架构上,这个改变的影响是巨大的,无状态应用的高可用管理、请求流量的管理、版本发布的管理、网络架构的管理等,其提升的架构复杂度巨大。
结合真实的协同办公产品:京办(为北京市政府提供协同办公服务的综合性平台)生产环境面对的复杂的政务网络以及京办同城双活架构演进的案例,给大家介绍下京办持续改进、分阶段演进过程中的一些思考和实践经验的总结。本文仅针对ES集群在跨机房同步过程中的方案和经验进行介绍和总结。
1.部署Logstash在金山云机房上,Logstash启动多个实例(按不同的类型分类,提高同步效率),并且和金山云机房的ES集群在相同的VPC
2.Logstash需要配置大网访问权限,保证Logstash和ES原集群和目标集群互通。
3.数据迁移可以全量迁移和增量迁移,首次迁移都是全量迁移后续的增加数据选择增量迁移。
4.增量迁移需要改造增加识别的增量数据的标识,具体方法后续进行介绍。
Logstash分为三个部分input 、filter、ouput:
1.input处理接收数据,数据可以来源ES,日志文件,kafka等通道.
2.filter对数据进行过滤,清洗。
3.ouput输出数据到目标设备,可以输出到ES,kafka,文件等。
对于T时刻的数据,先使用Logstash将T以前的所有数据迁移到有孚机房京东云ES,假设用时∆T
对于T到T+∆T的增量数据,再次使用logstash将数据导入到有孚机房京东云的ES集群
重复上述步骤2,直到∆T足够小,此时将业务切换到华为云,最后完成新增数据的迁移
适用范围:ES的数据中带有时间戳或者其他能够区分新旧数据的标签
1.创建ECS和安装JDK忽略,自行安装即可
2.下载对应版本的Logstash,尽量选择与Elasticsearch版本一致,或接近的版本安装即可
1) 源码下载直接解压安装包,开箱即用
2)修改对内存使用,logstash默认的堆内存是1G,根据ECS集群选择合适的内存,可以加快集群数据的迁移效率。
Logstash会帮助用户自动创建索引,但是自动创建的索引和用户本身的索引会有些许差异,导致最终数据的搜索格式不一致,一般索引需要手动创建,保证索引的数据完全一致。
以下提供创建索引的python脚本,用户可以使用该脚本创建需要的索引。
create_mapping.py文件是同步索引的python脚本,config.yaml是集群地址配置文件。
注:使用该脚本需要安装相关依赖
yum install -y PyYAML
yum install -y python-requests
拷贝以下代码保存为 create_mapping.py:
import yaml
import requests
import json
import getopt
import sys
def help():
print
"""
usage:
-h/--help print this help.
-c/--config config file path, default is config.yaml
example:
python create_mapping.py -c config.yaml
"""
def process_mapping(index_mapping, dest_index):
print(index_mapping)
# remove unnecessary keys
del index_mapping["settings"]["index"]["provided_name"]
del index_mapping["settings"]["index"]["uuid"]
del index_mapping["settings"]["index"]["creation_date"]
del index_mapping["settings"]["index"]["version"]
# check alias
aliases = index_mapping["aliases"]
for alias in list(aliases.keys()):
if alias == dest_index:
print(
"source index " + dest_index + " alias " + alias + " is the same as dest_index name, will remove this alias.")
del index_mapping["aliases"][alias]
if index_mapping["settings"]["index"].has_key("lifecycle"):
lifecycle = index_mapping["settings"]["index"]["lifecycle"]
opendistro = {"opendistro": {"index_state_management":
{"policy_id": lifecycle["name"],
"rollover_alias": lifecycle["rollover_alias"]}}}
index_mapping["settings"].update(opendistro)
# index_mapping["settings"]["opendistro"]["index_state_management"]["rollover_alias"] = lifecycle["rollover_alias"]
del index_mapping["settings"]["index"]["lifecycle"]
print(index_mapping)
return index_mapping
def put_mapping_to_target(url, mapping, source_index, dest_auth=None):
headers = {'Content-Type': 'application/json'}
create_resp = requests.put(url, headers=headers, data=json.dumps(mapping), auth=dest_auth)
if create_resp.status_code != 200:
print(
"create index " + url + " failed with response: " + str(create_resp) + ", source index is " + source_index)
print(create_resp.text)
with open(source_index + ".json", "w") as f:
json.dump(mapping, f)
def main():
config_yaml = "config.yaml"
opts, args = getopt.getopt(sys.argv[1:], '-h-c:', ['help', 'config='])
for opt_name, opt_value in opts:
if opt_name in ('-h', '--help'):
help()
exit()
if opt_name in ('-c', '--config'):
config_yaml = opt_value
config_file = open(config_yaml)
config = yaml.load(config_file)
source = config["source"]
source_user = config["source_user"]
source_passwd = config["source_passwd"]
source_auth = None
if source_user != "":
source_auth = (source_user, source_passwd)
dest = config["destination"]
dest_user = config["destination_user"]
dest_passwd = config["destination_passwd"]
dest_auth = None
if dest_user != "":
dest_auth = (dest_user, dest_passwd)
print(source_auth)
print(dest_auth)
# only deal with mapping list
if config["only_mapping"]:
for source_index, dest_index in config["mapping"].iteritems():
print("start to process source index" + source_index + ", target index: " + dest_index)
source_url = source + "/" + source_index
response = requests.get(source_url, auth=source_auth)
if response.status_code != 200:
print("*** get ElasticSearch message failed. resp statusCode:" + str(
response.status_code) + " response is " + response.text)
continue
mapping = response.json()
index_mapping = process_mapping(mapping[source_index], dest_index)
dest_url = dest + "/" + dest_index
put_mapping_to_target(dest_url, index_mapping, source_index, dest_auth)
print("process source index " + source_index + " to target index " + dest_index + " successed.")
else:
# get all indices
response = requests.get(source + "/_alias", auth=source_auth)
if response.status_code != 200:
print("*** get all index failed. resp statusCode:" + str(
response.status_code) + " response is " + response.text)
exit()
all_index = response.json()
for index in list(all_index.keys()):
if "." in index:
continue
print("start to process source index" + index)
source_url = source + "/" + index
index_response = requests.get(source_url, auth=source_auth)
if index_response.status_code != 200:
print("*** get ElasticSearch message failed. resp statusCode:" + str(
index_response.status_code) + " response is " + index_response.text)
continue
mapping = index_response.json()
dest_index = index
if index in config["mapping"].keys():
dest_index = config["mapping"][index]
index_mapping = process_mapping(mapping[index], dest_index)
dest_url = dest + "/" + dest_index
put_mapping_to_target(dest_url, index_mapping, index, dest_auth)
print("process source index " + index + " to target index " + dest_index + " successed.")
if __name__ == '__main__':
main()
配置文件保存为config.yaml:
# 源端ES集群地址,加上http://
source: http://ip:port
source_user: "username"
source_passwd: "password"
# 目的端ES集群地址,加上http://
destination: http://ip:port
destination_user: "username"
destination_passwd: "password"
# 是否只处理这个文件中mapping地址的索引
# 如果设置成true,则只会将下面的mapping中的索引获取到并在目的端创建
# 如果设置成false,则会取源端集群的所有索引,除去(.kibana)
# 并且将索引名称与下面的mapping匹配,如果匹配到使用mapping的value作为目的端的索引名称
# 如果匹配不到,则使用源端原始的索引名称
only_mapping: true
# 要迁移的索引,key为源端的索引名字,value为目的端的索引名字
mapping:
source_index: dest_index
以上代码和配置文件准备完成,直接执行 python create_mapping.py 即可完成索引同步。
索引同步完成可以取目标集群的kibana上查看或者执行curl查看索引迁移情况:
GET _cat/indices?v
Logstash配置位于config目录下。
用户可以参考配置修改Logstash配置文件,为了保证迁移数据的准确性,一般建议建立多组Logstash,分批次迁移数据,每个Logstash迁移部分数据。
配置集群间迁移配置参考:
input{
elasticsearch{
# 源端地址
hosts => ["ip1:port1","ip2:port2"]
# 安全集群配置登录用户名密码
user => "username"
password => "password"
# 需要迁移的索引列表,以逗号分隔,支持通配符
index => "a_*,b_*"
# 以下三项保持默认即可,包含线程数和迁移数据大小和logstash jvm配置相关
docinfo=>true
slices => 10
size => 2000
scroll => "60m"
}
}
filter {
# 去掉一些logstash自己加的字段
mutate {
remove_field => ["@timestamp", "@version"]
}
}
output{
elasticsearch{
# 目的端es地址
hosts => ["http://ip:port"]
# 安全集群配置登录用户名密码
user => "username"
password => "password"
# 目的端索引名称,以下配置为和源端保持一致
index => "%{[@metadata][_index]}"
# 目的端索引type,以下配置为和源端保持一致
document_type => "%{[@metadata][_type]}"
# 目标端数据的_id,如果不需要保留原_id,可以删除以下这行,删除后性能会更好
document_id => "%{[@metadata][_id]}"
ilm_enabled => false
manage_template => false
}
# 调试信息,正式迁移去掉
stdout { codec => rubydebug { metadata => true }}
}
https://www.elastic.co/guide/en/elasticsearch/reference/2.4/mapping-timestamp-field.html
PUT _ingest/pipeline/gmt_created_at
{
"description": "Adds gmt_created_at timestamp to documents",
"processors": [
{
"set": {
"field": "_source.gmt_created_at",
"value": "{{_ingest.timestamp}}"
}
}
]
}
GET _ingest/pipeline/*
PUT index_xxxx/_settings
{
"settings": {
"index.default_pipeline": "gmt_created_at"
}
}
GET index_xxxx/_settings
schedule-migrate.conf
index:可以使用通配符的方式
query: 增量同步的DSL,统一gmt_create_at为增量同步的特殊标记
schedule: 每分钟同步一把,"* * * * *"
input {
elasticsearch {
hosts => ["ip:port"]
# 安全集群配置登录用户名密码
user => "username"
password => "password"
index => "index_*"
query => '{"query":{"range":{"gmt_create_at":{"gte":"now-1m","lte":"now/m"}}}}'
size => 5000
scroll => "5m"
docinfo => true
schedule => "* * * * *"
}
}
filter {
mutate {
remove_field => ["source", "@version"]
}
}
output {
elasticsearch {
# 目的端es地址
hosts => ["http://ip:port"]
# 安全集群配置登录用户名密码
user => "username"
password => "password"
index => "%{[@metadata][_index]}"
document_type => "%{[@metadata][_type]}"
document_id => "%{[@metadata][_id]}"
ilm_enabled => false
manage_template => false
}
# 调试信息,正式迁移去掉
stdout { codec => rubydebug { metadata => true }}
}
mapping中存在join父子类型的字段,直接迁移报400异常
[2022-09-20T20:02:16,404][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400,
:action=>["index", {:_id=>"xxx", :_index=>"xxx", :_type=>"joywork_t_work", :routing=>nil}, #<LogStash::Event:0x3b3df773>],
:response=>{"index"=>{"_index"=>"xxx", "_type"=>"xxx", "_id"=>"xxx", "status"=>400,
"error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse",
"caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"[routing] is missing for join field [task_user]"}}}}}
https://discuss.elastic.co/t/an-routing-missing-exception-is-obtained-when-reindex-sets-the-routing-value/155140 https://github.com/elastic/elasticsearch/issues/26183
结合业务特征,通过在filter中加入小量的ruby代码,将_routing的值取出来,放回logstah event中,由此问题得以解决。
文章目录一、概述简介原理模块二、配置Mysql使用版本环境要求1.操作系统2.mysql要求三、配置canal-server离线下载在线下载上传解压修改配置单机配置集群配置分库分表配置1.修改全局配置2.实例配置垂直分库水平分库3.修改group-instance.xml4.启动监听四、配置canal-adapter1修改启动配置2配置映射文件3启动ES数据同步查询所有订阅同步数据同步开关启动4.验证五、配置canal-admin一、概述简介canal是Alibaba旗下的一款开源项目,Java开发。基于数据库增量日志解析,提供增量数据订阅&消费。Git地址:https://github.co
@作者:SYFStrive @博客首页:HomePage📜:微信小程序📌:个人社区(欢迎大佬们加入)👉:社区链接🔗📌:觉得文章不错可以点点关注👉:专栏连接🔗💃:感谢支持,学累了可以先看小段由小胖给大家带来的街舞👉微信小程序(🔥)目录自定义组件-behaviors 1、什么是behaviors 2、behaviors的工作方式 3、创建behavior 4、导入并使用behavior 5、behavior中所有可用的节点 6、同名字段的覆盖和组合规则总结最后自定义组件-behaviors 1、什么是behaviorsbehaviors是小程序中,用于实现
ES一、简介1、ElasticStackES技术栈:ElasticSearch:存数据+搜索;QL;Kibana:Web可视化平台,分析。LogStash:日志收集,Log4j:产生日志;log.info(xxx)。。。。使用场景:metrics:指标监控…2、基本概念Index(索引)动词:保存(插入)名词:类似MySQL数据库,给数据Type(类型)已废弃,以前类似MySQL的表现在用索引对数据分类Document(文档)真正要保存的一个JSON数据{name:"tcx"}二、入门实战{"name":"DESKTOP-1TSVGKG","cluster_name":"elasticsear
类似的问题,但对于java,Keepingi18nresourcessynced如何保持i18nyamllocals的key同步?即,当将key添加到en.yml时,如何将它们添加到nb.yml或ru.yml?如果我在my_title:"atitle"旁边添加键my_label:"sometextinenglish"我想把它给我的其他本地人我指定,因为我不能做所有的翻译,它应该回到其他语言的英语例如en.ymlsomegroup:my_tile:"atitleinenglish"my_label:"sometextinenglish"othergroup:...我想发出命令,将整个键和
最近在工作中,看到一些新手测试同学,对接口测试存在很多疑问,甚至包括一些从事软件测试3,5年的同学,在聊到接口时,也是一知半解;今天借着这个机会,对接口测试做个实战教学,顺便总结一下经验,分享给大家。计划拆分成4个模块跟大家做一个分享,(接口测试、接口基础知识、接口自动化、接口进阶)感兴趣的小伙伴记得关注,希望对你的日常工作和求职面试,带来一些帮助。注:文章较长有5000多字,希望小伙伴们认真看完,当然有些内容对小白同学不是太友好,如果你需要详细了解其中的一些概念或者名词,请在文章之后留言,后续我将针对大家的疑问,整理输出一些大家感兴趣的文章。随着开发模式的迭代更新,前后端分离已不是新的概念,
文章目录查看ES信息查看节点信息查看分片信息实际场景下ES分片及副本数量应该怎么分关于ES的灵活使用查看ES信息查看版本kibana:GET/查看节点信息GET/_cat/nodes?v解释:ip:集群中节点的ip地址;heap.percent:堆内存的占用百分比;ram.percent:总内存的占用百分比,其实这个不是很准确,因为buff/cache和available也被当作使用内存;cpu:cpu占用百分比;load_1m:1分钟内cpu负载;load_5m:5分钟内cpu负载;load_15m:15分钟内cpu负载;node.role:上图的dilmrt代表全部权限master:*代表
elasticsearch查看当前集群中的master节点是哪个需要使用_cat监控命令,具体如下。查看方法es主节点确定命令,以kibana上查看示例如下:GET_cat/nodesv返回结果示例如下:ipheap.percentram.percentcpuload_1mload_5mload_15mnode.rolemastername172.16.16.188529952.591.701.45mdi-elastic3172.16.16.187329950.990.991.19mdi-elastic2172.16.16.231699940.871.001.03mdi-elastic4172
目录FIFO一.自定义同步FIFO1.1代码设计1.2Testbech1.3行为仿真***学习位宽计算函数$clog2()***$clog2()系统函数使用,可以不关注***分布式资源或者BLOCKBRAM二.异步FIFO2.1在FIFO判满的时候有两种方式:2.2异步FIFO为什么要使用格雷码2.2.1介绍格雷码2.2.2格雷码在异步FIFO中的应用2.2.2格雷码判满2.4二进制与格雷码之间的转换2.4.1二进制码转换为格雷码的方法2.4.2格雷码转换为二进制码的方法2.3实现框图2.5实现及仿真代码2.6仿真图验证2.7结论FIFO 这篇更多的是记录FIFO学习,参考了众多优秀的文章,
运行有问题或需要源码请点赞关注收藏后评论区留言一、利用ContentResolver读写联系人在实际开发中,普通App很少会开放数据接口给其他应用访问。内容组件能够派上用场的情况往往是App想要访问系统应用的通讯数据,比如查看联系人,短信,通话记录等等,以及对这些通讯数据及逆行增删改查。首先要给AndroidMaifest.xml中添加响应的权限配置 下面是往手机通讯录添加联系人信息的例子效果如下分成三个步骤先查出联系人的基本信息,然后查询联系人号码,再查询联系人邮箱代码 ContactAddActivity类packagecom.example.chapter07;importandroid
📝学技术、更要掌握学习的方法,一起学习,让进步发生👩🏻作者:一只IT攻城狮。💐学习建议:1、养成习惯,学习java的任何一个技术,都可以先去官网先看看,更准确、更专业。💐学习建议:2、然后记住每个技术最关键的特性(通常一句话或者几个字),从主线入手,由浅入深学习。❤️《SpringCloud入门实战系列》解锁SpringCloud主流组件入门应用及关键特性。带你了解SpringCloud主流组件,是如何一战解决微服务诸多难题的。项目demo:源码地址👉🏻SpringCloud入门实战系列不迷路👈🏻:SpringCloud入门实战(一)什么是SpringCloud?SpringCloud入门实战