这样一来,如果某个Broker所在的机器意外宕机,而且刚好MessageQueue中的数据还没有持久化到磁盘,那么该Topic下的这部分消息就会完全丢失。此时如果有备份的话,MQ就可以继续对外提供服务。
为什么还会出现没有持久化到磁盘的情况呢?现在的OS当中,程序写入数据到文件之后,并不会立马写入到磁盘,因为磁盘I/O是非常耗时的操作,在计算机来看是非常慢的一种操作。所以写入文件的数据会先写入到OS自己的缓存中去,然后择机异步的将Buffer中的数据刷入磁盘。
通过多副本冗余的机制,使得RocketMQ具有了高可用的特性。除此之外,分布式存储能够应对后期业务大量的数据存储。如果不使用分布式进行存储,那么随着后期业务发展,消息量越来越大,单机是无论如何也满足不了RocketMQ消息的存储需求的。如果不做处理,那么一台机器的磁盘总有被塞满的时候,此时的系统就不具备可伸缩的特性,也无法满足业务的使用要求了。
但是这里的可伸缩,和微服务中的服务可伸缩还不太一样。因为在微服务中,各个服务是无状态的。而Broker是有状态的,每个Broker上存储的数据都不太一样,因为Producer在发送消息的时候会通过指定的算法,从Message Queue列表中选出一个MessageQueue发送消息。
如果不是很理解这个横向扩展,那么可以把它当成Redis的Cluster,通过一致性哈希,选择到Redis Cluster中的具体某个节点,然后将数据写入Redis Master中去。如果此时想要扩容很方便,只需要往Redis Cluster中新增Master节点就好了。
所以,数据分布式的存储本质上是一种数据分片的机制。在此基础上,通过冗余多副本,达成了高可用。
那RocketMQ的主从Broker是如何进行数据同步的呢?Broker启动的时候,会启动一个定时任务,定期的从Master Broker同步全量的数据。
这块可以先不用纠结,后面我们会通过源码来验证这个主从同步逻辑。
上面提到了Broker会部署很多个实例,那么既然多实例部署,那必然会存在一个问题,客户端是如何得知自己是连接的哪个服务器?如何得知对应的Broker的IP地址和端口?如果某个Broker突然挂了怎么办?
获取Topic的方法源码在上面已经给出来了,首先会从内存中维护的一份Map中获取数据。顺带一提,这里的Map是ConcurrentHashMap,是线程安全的,和Golang中的Sync.Map类似。
当然,首次发送的话,这个Map肯定是空的,此时会调用NameServer的接口,通过Topic去获取详情的Topic数据,此时会在上面的方法中将其加入到Map中去,这样一来下次再往该Topic发送消息就能够直接从内存中获取。这里就是简单的实现的缓存机制 。
从方法名称来看,是通过Topic获取路由数据。实际上该方法,通过调用NameServer提供的API,更新了两部分数据,分别是:

通过源码可以看到,就包含了该Topic下所有Broker下的Message Queue相关的数据、所有Broker的地址信息。
什么叫发送到哪儿?开篇提到过一个Topic下会被分为很多个MessageQueue,「发送到哪儿」指的就是具体发送到哪一个Message Queue中去。
核心逻辑,用大白话讲就是将一个随机数和Message Queue的容量取模。这个随机数存储在Thread Local中,首次计算的时候,会直接随机一个数。
此后,都直接从ThreadLocal中取出该值,并且+1返回,拿到了MessageQueue的数量和随机数两个关键的参数之后,就会执行最终的计算逻辑。
接下来,我们来看看选择Message Queue的方法SelectOneMessageQueue都做了什么操作吧。
可以看到,主逻辑被变量sendLatencyFaultEnable分为了两部分。
可以看到,如果调用Broker信息发生了异常,那么就会调用updateFault这个方法,来更新Broker的Aviable情况。注意这个参数isolation的值为true。接下来我们从源码级别来验证上面说的退避3000ms的事实。
可以看到,isolation值是true,则duration通过三元运算符计算出来结果为30000,也就是30秒。所以我们可以得出结论,如果发送消息抛出了异常,那么直接会将该Broker设置为30秒内不可用。
而如果只是发送延迟较高,则会根据如下的map,根据延迟的具体时间,来判断该设置多少时间的不可用。
例如,如果上次请求的latency超过550ms,就退避3000ms;超过1000,就退避60000;
可以看到判断了是否开启了Dleger,默认是不开启的。所以就会执行其中的逻辑。
刚好我们就看到了,里面有Rocket主从同步数据的相关代码。
如果当前Broker节点的角色是Slave,则会启动一个周期性的定时任务,定期(也就是10秒)去Master Broker同步全量的数据。同步的数据包括:
可以看到,这里会判断是否需要进行注册。通过上面的截图可以看到,此时forceRegister的值为true,而是否要注册,决定权就交给了needRegister
为什么需要判断是否需要注册呢?因为Broker一旦注册到了NameServer之后,由于Producer不停的在写入数据,Consumer也在不停的消费数据,Broker也可能因为故障导致某些Topic下的Message Queue等关键的路由信息发生变动。
这样一来,NameServer中的数据和Broker中的数据就会不一致。
接下来,我们从源码层面验证这个逻辑。关键的逻辑我在图中也标注了出来。
可以看到, 就是通过对比Broker中的数据版本和NameServer中的数据版本来实现的。这个版本,注册的时候会写到注册的数据中存入NameServer中。
这里由于是有多个,所以RocketMQ用线程池来实现了多线程操作,并且用CountDownLatch来等待所有的返回结果。经典的用空间换时间,Golang里面也有类似的操作,那就是sync.waitGroup。
关于任何一个数据不匹配,都会进行重新注册的事实,我们也从源码层面来验证一下。
可以看到,如果任何一台NameServer的数据发生了Change,都会break,返回true。
这里的结果列表使用的是CopyOnWriteList来实现的。
因为这里是多线程去执行的判断逻辑,而正常的列表不是线程安全的。CopyOnWriteArrayList之所以是线程安全的,这归功于COW(Copy On Write),读请求时共用同一个List,涉及到写请求时,会复制出一个List,并在写入数据的时候加入独占锁。比起直接对所有操作加锁,读写锁的形式分离了读、写请求,使其互不影响,只对写请求加锁,降低了加锁的次数、减少了加锁的消耗,提升了整体操作的并发。
可以看到,Topic的数据分为了两部分,一部分是核心的逻辑,另一部分是DataVersion,也就是我们刚刚一直提到的数据版本。
然后给出结论,Producer发送的消息是存储在一种叫commit log的文件中的,Producer端每次写入的消息是不等长的,当该CommitLog文件写入满1G,就会新建另一个新的CommitLog,继续写入。此次采取的是顺序写入。
那么问题来了,Consumer来消费的时候,Broker是如何快速找到对应的消息的呢?我们首先排除遍历文件查找的方法, 因为RocketMQ是以高吞吐、高性能著称的,肯定不可能采取这种对于很慢的操作。那RocketMQ是如何做的呢?
答案是ConsumerQueue
当该ConsumerQueue文件写满了之后,就会再新建一个ConsumerQueue文件,继续写入。
所以,ConsumerQueue文件可以看成是CommitLog文件的索引。
为什么?道理很简单,RocketMQ支持的顺序消费,是指的分区顺序性,也就是在单个MessageQueue中,消息是具有顺序性的,而如果多台Consumer去消费同一个MessageQueue,就很难去保证顺序消费了。由于有很多个Consumer在消费多个MessageQueue,所以为了不出现数据倾斜,也为了资源的合理分配利用,在Producer发送消息的时候,需要尽可能的将消息均匀的分发给多个MessageQueue。 同时,上面那种一个Consumer消费了2个MessageQueue的情况,万一这台Consumer挂了呢?这两个MessageQueue不就没人消费了? 以上两种情况分别是Producer端的负载均衡、Consumer端的负载均衡。
默认是20秒执行一次。具体的代码如下。
首先会均匀的按照类似分页的思想,将MessageQueue分配给Consumer,如果分配的不均匀,则会依次的将剩下的MessageQueue按照排序的顺序,从上往下的分配。所以在这里Consumer 1被分配到了4个MessageQueue,而Consumer 2被分配到了3个MessageQueue。
Rebalance完了之后,会将结果和Consumer缓存的数据做对比,移除不在ReBalance结果中的MessageQueue,将原本没有的MessageQueue给新增到缓存中。
验证了我们刚刚说的获取了本地的Topic数据缓存,和从Broker端拉取所有的ConsumerID。
接下来是验证刚说的排序逻辑。
接下来是看判断结果是否发生了变化的源码。
可以看到,Consumer通知Broker策略,其本质上就是发送心跳,将更新后的数据通过心跳发送给所有的Broker。
而消费模式的不同,会影响到管理offset的具体实现。
可以看到,当消费模型是广播模式时,Offset的持久化管理会使用实现LocalFileOffsetStorage
当消费模式是集群消费时,则会使用RemoteBrokerOffsetStore。
具体原因是什么呢?首先我们得知道广播模式和集群模式的区别在哪儿:
不同的消费方式会采取不同的底层实现,配置完成之后就会调用start。
这个线程做了什么事呢?它会不停的从一个维护在内存中的Queue中获取一个在写入的时候就构建好的PullRequest对象,调用具体实现去不停的拉取消息了。
这是在AutoCommit下,如果消费失败了的处理逻辑。会记录一个失败的TPS,然后这里有一个非常关键的逻辑,那就是checkReconsumeTimes。
如果当前消息的重试次数,如果大于了最大的重试消费次数,就会把消费发回给Broker。那最大重试次数是如何定义的。
如果值为-1,那么最大次数就是MAX_VALUE,也就是2147483647。这里有点奇怪啊,按照我们平常的认知,难道不是重试16次吗?然后就看到了很骚的一句注释。
-1 means 16 times,这代码确实有点,一言难尽。
然后,如果超过了最大的次数限制,就会将该消息调用Prodcuer的默认实现,将其发送到死信队列中。当然,死信队列也不是什么特殊的存在,就是一个单独的Topic而已。
通过getRetryTopic来获取的,默认是给当前的ConsumerGroup名称加上一个前缀。
本篇文章已放到我的 Github github.com/sh-blog 中,欢迎 Star。微信搜索关注【SH的全栈笔记】,回复【队列】获取MQ学习资料,包含基础概念解析和RocketMQ详细的源码解析,持续更新中。 如果你觉得这篇文章对你有帮助,还麻烦点个赞,关个注,分个享,留个言。
我有一个字符串input="maybe(thisis|thatwas)some((nice|ugly)(day|night)|(strange(weather|time)))"Ruby中解析该字符串的最佳方法是什么?我的意思是脚本应该能够像这样构建句子:maybethisissomeuglynightmaybethatwassomenicenightmaybethiswassomestrangetime等等,你明白了......我应该一个字符一个字符地读取字符串并构建一个带有堆栈的状态机来存储括号值以供以后计算,还是有更好的方法?也许为此目的准备了一个开箱即用的库?
我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i
我正在使用ruby1.9解析以下带有MacRoman字符的csv文件#encoding:ISO-8859-1#csv_parse.csvName,main-dialogue"Marceu","Giveittohimóhe,hiswife."我做了以下解析。require'csv'input_string=File.read("../csv_parse.rb").force_encoding("ISO-8859-1").encode("UTF-8")#=>"Name,main-dialogue\r\n\"Marceu\",\"Giveittohim\x97he,hiswife.\"\
简而言之错误:NOTE:Gem::SourceIndex#add_specisdeprecated,useSpecification.add_spec.Itwillberemovedonorafter2011-11-01.Gem::SourceIndex#add_speccalledfrom/opt/local/lib/ruby/site_ruby/1.8/rubygems/source_index.rb:91./opt/local/lib/ruby/gems/1.8/gems/rails-2.3.8/lib/rails/gem_dependency.rb:275:in`==':und
一、引擎主循环UE版本:4.27一、引擎主循环的位置:Launch.cpp:GuardedMain函数二、、GuardedMain函数执行逻辑:1、EnginePreInit:加载大多数模块int32ErrorLevel=EnginePreInit(CmdLine);PreInit模块加载顺序:模块加载过程:(1)注册模块中定义的UObject,同时为每个类构造一个类默认对象(CDO,记录类的默认状态,作为模板用于子类实例创建)(2)调用模块的StartUpModule方法2、FEngineLoop::Init()1、检查Engine的配置文件找出使用了哪一个GameEngine类(UGame
1.postman介绍Postman一款非常流行的API调试工具。其实,开发人员用的更多。因为测试人员做接口测试会有更多选择,例如Jmeter、soapUI等。不过,对于开发过程中去调试接口,Postman确实足够的简单方便,而且功能强大。2.下载安装官网地址:https://www.postman.com/下载完成后双击安装吧,安装过程极其简单,无需任何操作3.使用教程这里以百度为例,工具使用简单,填写URL地址即可发送请求,在下方查看响应结果和响应状态码常用方法都有支持请求方法:getpostputdeleteGet、Post、Put与Delete的作用get:请求方法一般是用于数据查询,
Ⅰ软件测试基础一、软件测试基础理论1、软件测试的必要性所有的产品或者服务上线都需要测试2、测试的发展过程3、什么是软件测试找bug,发现缺陷4、测试的定义使用人工或自动的手段来运行或者测试某个系统的过程。目的在于检测它是否满足规定的需求。弄清预期结果和实际结果的差别。5、测试的目的以最小的人力、物力和时间找出软件中潜在的错误和缺陷6、测试的原则28原则:20%的主要功能要重点测(eg:支付宝的支付功能,其他功能都是次要的)80%的错误存在于20%的代码中7、测试标准8、测试的基本要求功能测试性能测试安全性测试兼容性测试易用性测试外观界面测试可靠性测试二、质量模型衡量一个优秀软件的维度①功能性功
ES一、简介1、ElasticStackES技术栈:ElasticSearch:存数据+搜索;QL;Kibana:Web可视化平台,分析。LogStash:日志收集,Log4j:产生日志;log.info(xxx)。。。。使用场景:metrics:指标监控…2、基本概念Index(索引)动词:保存(插入)名词:类似MySQL数据库,给数据Type(类型)已废弃,以前类似MySQL的表现在用索引对数据分类Document(文档)真正要保存的一个JSON数据{name:"tcx"}二、入门实战{"name":"DESKTOP-1TSVGKG","cluster_name":"elasticsear
我正在使用ruby2.1.0我有一个json文件。例如:test.json{"item":[{"apple":1},{"banana":2}]}用YAML.load加载这个文件安全吗?YAML.load(File.read('test.json'))我正在尝试加载一个json或yaml格式的文件。 最佳答案 YAML可以加载JSONYAML.load('{"something":"test","other":4}')=>{"something"=>"test","other"=>4}JSON将无法加载YAML。JSON.load("
我想用Nokogiri解析HTML页面。页面的一部分有一个表,它没有使用任何特定的ID。是否可以提取如下内容:Today,3,455,34Today,1,1300,3664Today,10,100000,3444,Yesterday,3454,5656,3Yesterday,3545,1000,10Yesterday,3411,36223,15来自这个HTML:TodayYesterdayQntySizeLengthLengthSizeQnty345534345456563113003664354510001010100000344434113622315