由于篇幅原因,本次的源码分析只限于Producer侧的发送消息的核心逻辑,我会通过流程图、代码注释、文字讲解的方式来对源码进行解释,后续应该会专门开几篇文章来做源码分析。这篇博客聊聊关于RocketMQ相关的东西,主要聊的点有RocketMQ的功能使用、RocketMQ的底层运行原理和部分核心逻辑的源码分析。至于我们为什么要用MQ、使用MQ能够为我们带来哪些好处、MQ在社区有哪些实现、社区的各个MQ的优劣对比等等,我在之前的文章《消息队列杂谈》已经聊过了,如果需要了解的话可以回过头去看看。
上图中,RocketMQ被标识为了一个单点,但事实上肯定不是如此,对于可以随时横向扩展的服务来说,生产者向MQ生产消息的数量也会随之而变化,所以一个合格成熟的MQ必然是要能够处理这种情况的;而且MQ自身需要做到高可用,否则一旦这个单点宕机,那所有存储在MQ中的消息就全部丢失且无法找回了。
所以在实际的生产环境中,肯定是会部署一个MQ的集群。而在RocketMQ中,这个“实例”有个专属名词,叫做Broker。并且,每个Broker都会部署一个Slave Broker,Master Broker会定时的向Slave Broker同步数据,形成一个Broker的主从架构。
那么问题来了,在微服务的架构中,部署的服务也存在多实例部署的情况,服务之间相互调用是通过注册中心来获取对应服务的实例列表的。
拿Spring Cloud举例,服务通过Eureka注册中心获取到某个服务的全部实例,然后交给Ribbon,Ribbon联动Eureka,从Eureka处获取到服务实例的列表,然后通过负载均衡算法选出一个实例,最后发起请求。
同理,此时MQ中存在多个Broker实例,那生产者如何得知MQ集群中有多少Broker实例呢?自己应该连接哪个实例?
首先我们直接排除在代码里Hard Code,具体原因我觉得应该不用再赘述了。RocketMQ是如何解决这个问题呢?这就是接下来我们要介绍的NameServer了。
有了NameServer,客户端启动之后会和NameServer交互,获取到当前RocketMQ集群中所有的Broker信息、路由信息。这样一来,生产者就知道自己需要连接的Broker信息了,就可以进行消息投递。
那么问题来了,如果在运行过程中,如果某个Broker突然宕机,NameServer会如何处理?
这需要提到RocketMQ的这续约机制和故障感知机制。Broker在完成向NameServer的注册之后,会每隔30秒向NameServer发送心跳进行续约;如果NameServer感知到了某个Broker超过了120秒都没有发送心跳,则会认为这个Broker不可用,将其从自己维护的信息中移除。
这套机制,和Spring Cloud中的Eureka的实现如出一辙。Eureka中的Service在启动之后也会向Eureka注册自己,这样一来其他的服务就可以向该服务发起请求,交换数据。Service每隔30秒会向Eureka发送心跳续约,如果某个Service超过了90秒没有发送心跳,Eureka就会认为该服务宕机,将其从Eureka维护的注册表中移除。
上面图中我聊到了多实例部署,这个多实例部署和微服务中的多实例部署还不太一样,微服务中,所有的服务都是无状态的,可以横向的扩展,而在RocketMQ中,每个Broker所存的数据可能都不一样。
我们来看一下RocketMQ的简单使用。
Message msg = new Message(
"TopicTest",
"TagA",
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
SendResult sendResult = producer.send(msg);
可以看到,Message的第一个参数,为当前这条消息指定了一个Topic,那Topic又是什么呢?
通过上图可以看出,同一个Topic的数据,被分成了好几份,分别存储在不同的Broker上,那RocketMQ为什么要这么实现?
首先,一个Topic中如果只有一个Queue,那么消费者在消费时的速度必然受到影响;而如果一个Topic有很多个Queue,那么Consumer就可以将消费操作同时进行,从而扛住更多的并发。
除此之外,单台机器的资源是有限的。一个Topic的消息量可能会非常之巨大,一台机器的磁盘很快就会被塞满。所以RocketMQ将一个Topic的数据分摊给了多台机器,进行分散存储。其本质上就是一个数据分片存储的一种机制。
所以我们知道了,发送到某个Topic的数据是分布式的存储在多个Broker中的MessageQueue上的。
这就是最底层的存储的方式,那么问题来了,Consumer来取消息的时候,Broker是如何从这一堆的Commit Log中找到相应的数据呢?众所周知,一提到磁盘的I/O操作,就会联想到耗时这两个字,而RocketMQ的一大特点就是高吞吐,看似很矛盾,RocketMQ是如何做的呢?
答案是ConsumeQueue,Broker在写入Commit Log的同时,还会将当前这条消息在Commit Log中的Offset、消息的Size和对应的Tag的Hash写入到ConsumeQueue文件中。每个Message Queue会有相对应的ConsumeQueue文件存储在磁盘上。
和Commit Log一样,一个ConsumeQueue包含了30W条消息,每条消息的大小为20字节,所以每个ConsumeQueue文件的大小约为5.72M;当其写满了之后,会再新建一个ConsumeQueue文件继续写入。
ConsumeQueue是一种逻辑队列,更是一种索引,让Consumer来消费的时候可以快速的从磁盘文件中定位到这条消息。
看到这你可能会想,上面提到的Tag又是个什么东西?
首先我们会初始化一个DefaultMQProducer,RocketMQ会给这个Producer一个默认的实现DefaultMQProducerImpl。然后producer.start()会启动一个线程池。
producer.send(msg)了,首先RocketMQ会调用checkMessage来检测发送的消息是否合法。
这些检测包含了待发送的消息是否为空,Topic是否为空、Topic是否包含了非法的字符串、Topic的长度是否超过了最大限制127,然后会去检查Body是否符合发送要求,例如msg的Body是否为空、msg的Body是否超过了最大的限制等等,这里消息的Body最大不能超过4M。
DefaultMQProducerImpl中的sendDefaultImpl默认实现,发送消息给Broker,默认的发送消息Timeout是3秒。
发送消息中,MQ会再次调用checkMessage对消息的合法性再次进行检查,然后就会去尝试获取Topic的详细信息。
所有的Topic的信息都会存在一个叫topicPublishInfoTable的 ConcurrentHashMap中,这个Map中Key就是Topic的字符串,而Value则是TopicPublishInfo。
这个TopicPublishInfo中就包含了之前在基础概念中提到的,从Broker中获取到的相应的元数据,其中就包含了关键的MessageQueue和集群元数据,其基础的结构如下。
messageQueueList包含了该Topic下的所有的MessageQueue,每个MessageQueue的所属Topic,每个MessageQueue所在的Broker的名称以及专属的queueId。
topicRouteData包含了该Topic下的所有的Queue、Broker相关的数据。
tryToFindTopicPublishInfo方法获取的,详细的注释我已经写在了下图中。
对于首次使用的Topic,在上面的Map肯定是不存在的。所以RocketMQ会将其加入到Map中去,并且调用方法updateTopicRouteInfoFromNameServer从NameServer处获取该Topic的元数据,将其一并写入Map。初次之外,还会将路由信息、Broker的详细信息分别放入topicRouteTable和brokerAddrTable中,这两个都是Producer维护在内存中的ConcurrentHashMap。
获取到了Topic的详细信息之后,接下来会确认一个发送的重试次数timesTotal,假设timesTotal为N,那么发送消息如果失败就会重试N次。不过当且仅当发送失败的时候才会进行重试,其余的case都不会,例如超时、或者没有选择到合适的MessageQueue。
这个重试的次数timesTotal受到参数communicationMode的影响;CommunicationMode有三个值,分别是SYNC、ASYNC和ONEWAY。RocketMQ默认的实现中,选择了SYNC同步。
通过代码我们可以看到,如果是communicationMode是SYNC的话,timesTotal的值为1+retryTimesWhenSendFailed,而retryTimesWhenSendFailed的值默认为2,代表在消息发送失败之后的重试次数。
这样一来,如果我们选择了SYNC的方式,Producer在发送消息的时候默认的重试次数就为3。不过当且仅当发送失败的时候才会进行重试,其余的case都不会。
selectOneMessageQueue来进行的Message Queue选择,该方法通过Topic的详细元数据和上次选择的MessageQueue所在的Broker,来决定下一个的选择。
上图就是MessageQueue最核心的、最底层的原则机制了。但是由于实际的业务情况十分复杂, RocketMQ在实现中还额外的做了很多的事情。
sendLatencyFaultEnable的值决定,其默认值是false,也就是默认是不开启的,从代码里我暂时没找到其开启的位置。
不过我们可以聊聊开启之后,会发生什么。它同样会开启for循环,次数为MessageQueue的数量,计算拿到确定的Queue之后,会通过内存的一张表faultItemTable去判断当前这个Broker是否可用,该表是每次发送消息的时候都会去更新它。
如果当前没有可用的Broker,则会触发其兜底的逻辑,再选择一个MessageQueue出来。
微信搜索关注【SH的全栈笔记】,回复【队列】获取MQ学习资料,包含基础概念解析和RocketMQ详细的源码解析,持续更新中。 如果你觉得这篇文章对你有帮助,还麻烦点个赞,关个注,分个享,留个言。
一、引擎主循环UE版本:4.27一、引擎主循环的位置:Launch.cpp:GuardedMain函数二、、GuardedMain函数执行逻辑:1、EnginePreInit:加载大多数模块int32ErrorLevel=EnginePreInit(CmdLine);PreInit模块加载顺序:模块加载过程:(1)注册模块中定义的UObject,同时为每个类构造一个类默认对象(CDO,记录类的默认状态,作为模板用于子类实例创建)(2)调用模块的StartUpModule方法2、FEngineLoop::Init()1、检查Engine的配置文件找出使用了哪一个GameEngine类(UGame
1.postman介绍Postman一款非常流行的API调试工具。其实,开发人员用的更多。因为测试人员做接口测试会有更多选择,例如Jmeter、soapUI等。不过,对于开发过程中去调试接口,Postman确实足够的简单方便,而且功能强大。2.下载安装官网地址:https://www.postman.com/下载完成后双击安装吧,安装过程极其简单,无需任何操作3.使用教程这里以百度为例,工具使用简单,填写URL地址即可发送请求,在下方查看响应结果和响应状态码常用方法都有支持请求方法:getpostputdeleteGet、Post、Put与Delete的作用get:请求方法一般是用于数据查询,
Ⅰ软件测试基础一、软件测试基础理论1、软件测试的必要性所有的产品或者服务上线都需要测试2、测试的发展过程3、什么是软件测试找bug,发现缺陷4、测试的定义使用人工或自动的手段来运行或者测试某个系统的过程。目的在于检测它是否满足规定的需求。弄清预期结果和实际结果的差别。5、测试的目的以最小的人力、物力和时间找出软件中潜在的错误和缺陷6、测试的原则28原则:20%的主要功能要重点测(eg:支付宝的支付功能,其他功能都是次要的)80%的错误存在于20%的代码中7、测试标准8、测试的基本要求功能测试性能测试安全性测试兼容性测试易用性测试外观界面测试可靠性测试二、质量模型衡量一个优秀软件的维度①功能性功
ES一、简介1、ElasticStackES技术栈:ElasticSearch:存数据+搜索;QL;Kibana:Web可视化平台,分析。LogStash:日志收集,Log4j:产生日志;log.info(xxx)。。。。使用场景:metrics:指标监控…2、基本概念Index(索引)动词:保存(插入)名词:类似MySQL数据库,给数据Type(类型)已废弃,以前类似MySQL的表现在用索引对数据分类Document(文档)真正要保存的一个JSON数据{name:"tcx"}二、入门实战{"name":"DESKTOP-1TSVGKG","cluster_name":"elasticsear
我正在学习Ruby,遇到了inject。我正处于理解它的风口浪尖,但当我是那种需要真实世界的例子来学习一些东西的人时。我遇到的最常见的例子是人们使用inject来添加一个(1..10)范围的总和,我不太关心这个。这是一个任意的例子。在实际程序中我会用它做什么?我正在学习,所以我可以继续使用Rails,但我不必有一个以Web为中心的示例。我只需要一些我可以全神贯注的目标。谢谢大家。 最佳答案 inject有时可以通过它的“其他”名称reduce更好地理解。它是一个对Enumerable进行操作(迭代一次)并返回单个值的函数。它有许多有
(本文是网络的宏观的概念铺垫)目录计算机网络背景网络发展认识"协议"网络协议初识协议分层OSI七层模型TCP/IP五层(或四层)模型报头以太网碰撞路由器IP地址和MAC地址IP地址与MAC地址总结IP地址MAC地址计算机网络背景网络发展 是最开始先有的计算机,计算机后来因为多项技术的水平升高,逐渐的计算机变的小型化、高效化。后来因为计算机其本身的计算能力比较的快速:独立模式:计算机之间相互独立。 如:有三个人,每个人做的不同的事物,但是是需要协作的完成。 而这三个人所做的事是需要进行协作的,然而刚开始因为每一台计算机之间都是互相独立的。所以前面的人处理完了就需要将数据
目录0专栏介绍1平面2R机器人概述2运动学建模2.1正运动学模型2.2逆运动学模型2.3机器人运动学仿真3动力学建模3.1计算动能3.2势能计算与动力学方程3.3动力学仿真0专栏介绍?附C++/Python/Matlab全套代码?课程设计、毕业设计、创新竞赛必备!详细介绍全局规划(图搜索、采样法、智能算法等);局部规划(DWA、APF等);曲线优化(贝塞尔曲线、B样条曲线等)。?详情:图解自动驾驶中的运动规划(MotionPlanning),附几十种规划算法1平面2R机器人概述如图1所示为本文的研究本体——平面2R机器人。对参数进行如下定义:机器人广义坐标
网站的日志分析,是seo优化不可忽视的一门功课,但网站越大,每天产生的日志就越大,大站一天都可以产生几个G的网站日志,如果光靠肉眼去分析,那可能看到猴年马月都看不完,因此借助网站日志分析工具去分析网站日志,那将会使网站日志分析工作变得更简单。下面推荐两款网站日志分析软件。第一款:逆火网站日志分析器逆火网站日志分析器是一款功能全面的网站服务器日志分析软件。通过分析网站的日志文件,不仅能够精准的知道网站的访问量、网站的访问来源,网站的广告点击,访客的地区统计,搜索引擎关键字查询等,还能够一次性分析多个网站的日志文件,让你轻松管理网站。逆火网站日志分析器下载地址:https://pan.baidu.
1.回顾.TransportServicepublicclassTransportServiceextendsAbstractLifecycleComponentTransportService:方法:1publicfinalTextendsTransportResponse>voidsendRequest(finalTransport.Connectionconnection,finalStringaction,finalTransportRequestrequest,finalTransportRequestOptionsoptions,TransportResponseHandlerT>
参考文章搭建文章gitte源码在线体验可以注册两个号来测试演示图:一.整体介绍 介绍SignalR一种通讯模型Hub(中心模型,或者叫集线器模型),调用这个模型写好的方法,去发送消息。 内容有: ①:Hub模型的方法介绍 ②:服务器端代码介绍 ③:前端vue3安装并调用后端方法 ④:聊天室样例整体流程:1、进入网站->调用连接SignalR的方法2、与好友发送消息->调用SignalR的自定义方法 前端通过,signalR内置方法.invoke() 去请求接口3、监听接受方法(渲染消息)通过new signalR.HubConnectionBuilder().on