作者:vivo 互联网存储技术团队-Luo Mingbo、中间件团队- Liu Runyun
本文根据“2022 vivo开发者大会"现场演讲内容整理而成。
本文主要介绍超大数据规模场景下分布式消息中间件在vivo的应用实践。
在线业务侧主要从RocketMQ集群部署架构、平台系统架构、日常运维操作平台、监控告警一体化实践以及vivo如何通过建设AMQP消息网关的方式完成所有在线业务服务从RabbitMQ到RocketMQ的业务无感迁移,实现了在线业务消息中间件组件的统一。
大数据侧主要从资源隔离、流量均衡、智能动态限流、集群治理四个维度介绍Kafka在vivo的最佳实践以及Kafka核心技术架构在超大数据规模场景下的缺陷以及未来对Pulsar组件的长线规划和建设。
在技术选型上,我们从吞吐量、功能特性、生态集成、开源活跃等多个维度对比了当前主流的分布式消息中间件,最终在线业务侧我们选择基于RocketMQ构建消息平台,依托RocketMQ丰富的功能特性满足业务间削峰、解耦、异步化的需求。
大数据侧我们选择具备高并发、高可用、低延迟、高吞吐能力的分布式消息中间件Kafka。构建超大数据规模处理能力的统一数据接入服务和实时数仓服务。Kafka组件作为统一数据接入服务,是大数据全链路中的咽喉要道,是大数据生态体系建设中不可或缺的重要组件之一。
运营指标方面目前大数据业务侧Kafka集群接入项目数百、接入规模方面Topic数量达到数万、集群日均处理消息达数十万亿条、可用性保障99.99%、单机日均处理消息达数百亿条。
在线业务侧RocketMQ集群接入项目数百、接入规模方面接入数千服务、集群日均处理消息达数百亿条、可用性保障100%,发送平均耗时<1ms。
首先我们看下Kafka的官网定义及发展历史,Kafka是由Apache软件基金会开源的一个流处理平台,是一种高吞吐量的分布式发布订阅消息系统。具有高吞吐、低延迟、高并发、高可用、高可扩等特性。
Kafka是由LinkedIn公司在2010年开源,2011年交由Apache软件基金会进行孵化,2012年成为Apache软件基金会的顶级开源项目。
在超大数据规模场景下我们会面临以下几个问题?
如何规划资源隔离保证核心业务、高优业务、一般业务之间相互不受影响?
如何保证集群内部节点间流量均衡,降低单节点或部分节点流量差异太大带来的资源浪费?
超大数据规模场景下如何进行限流保障集群的稳定性并尽可能降低对业务可用性的影响?
集群长期运行,客户端版本多样,如何持续保障集群的高可用性?
下面我将从资源隔离、流量均衡、智能动态限流、集群治理四个维度和大家一起交流Kafka在vivo的最佳实践。
资源隔离的核心作用在于避免业务与业务之间的相互影响,但隔离粒度、资源利用率、运维成本之间如何进行权衡,是我们需要思考的重点。隔离粒度太粗会导致隔离效果不佳,隔离粒度太细会导致资源利用率较低、运维成本增加。
那vivo在Kafka集群资源隔离上是如何平衡三者关系的呢?
首先我们根据业务属性、业务线两个维度进行集群维度的隔离,例如我们在集群划分上分为了商业化专用集群,监控专用集群,日志专用集群等。在集群维度做了机器资源的物理隔离。
同时我们在集群内部引入了资源组的概念。同一个集群内部可以包含多个资源组。每个资源组可以为多个业务提供服务。资源组与资源组之间相互独立。
上图中右上图是我们没有引入资源组概念时集群内部不同业务Topic分区的分散情况,大家可以看到业务A和业务B的Topic分区分散到集群内的所有broker上,若业务A的流量突增可能会造成业务B受到影响,右下图是我们引入资源组概念后不同业务Topic分区的分散情况,可以看到不同业务的topic分区只会分配到自己业务所属的资源组内,即使业务A的流量突增导致机器不可用也不会对业务B造成影响。
引入资源组概念后让我们能在集群内部实现机器资源的逻辑隔离。所以我们在资源隔离方面采用了物理隔离和逻辑隔离两种方式相结合,实现了在超大数据规模场景下Kafka集群的资源隔离方案。
流量均衡的核心作用在于充分利用集群内部资源,提升资源利用率。Kafka服务作为一个有状态的服务,Kafka在技术架构设计上Topic分区与节点绑定,不支持分区同一副本数据在磁盘和节点维度分散存储。对分区的读写请求都由分区Leader所在节点进行处理。所以Kafka集群流量均衡的本质是Topic分区的分散均衡。
在流量均衡方面我们做两期的建设,第一期我们在分区分散均衡算法上引入机器的实时出入流量、cpu负载、磁盘存储等指标作为负载因子生成分区迁移计划。执行分区迁移后达到流量均衡的目的。流量均衡一期功能上线后我们将资源组内节点间流量差异从数百兆/s降低到数十兆/s。随着集群数据规模的持续增加,我们发现数十兆/s的流量差异依然会造成资源浪费。
所以在流量均衡二期功能建设上我们增加了分区分散均衡、Leader分散均衡、副本分散均衡、磁盘均衡等Kafka元数据指标作为负载因子生成Kafka分区迁移计划,并在分区迁移执行上增加了多种迁移提交策略。流量均衡二期功能上线后我们将资源组内节点间流量差异从数十兆/s降低到十兆以内/s。
上图是我们流量均衡一期功能上线前后资源组内节点的流量监控面板,可以看到一期功能上线前资源组内节点间的流量偏差在数百兆/s。一期功能上线后资源组内节点间流量偏差在数十兆/s以内,资源组内节点间流量偏差降低75%。极大提升了服务端的资源利用率。
上图是我们流量均衡二期功能上线前后资源组内节点的入出流量监控面板,可以看到节点间入出流量偏差从数十兆/s降低到十兆以内/s,资源组内节点间流量偏差降低80%。效果也是非常明显。
限流的本质是限制客户端的流量突增以确保服务端的可用性。避免客户端的流量突增导致服务端整体不可用。限流的粒度,限流阈值的设定,资源利用率、服务端稳定性之间应该如何做权衡呢?是我们需要思考的重点。限流粒度太粗会导致限流效果不佳,当大部分业务同时流量突增会对服务端的稳定性带来风险。限流粒度太细服务端应对客服端流量突增能力不足,限流阈值设置太大会给服务端稳定性带来风险,限流阈值设置太小会导致服务端资源利用率较低。
限流方面,
首先我们采用多平台联合诊断机制根据项目实际生产数据情况判别是否需要进行流量调整,计算调整后的限流阈值。其中多平台包含(JMX统一指标采集平台,统一监控平台、统一告警平台、Kafka集群管理平台等)。
第二、智能分析Kafka集群服务资源负载情况,计算各资源剩余情况。确定是否可以进行阈值调整并结合客户端实际生产数据情况计算阈值调整到多少合适。
第三、自动实时调整限流阈值。
通过以上三步实现智能动态限流方案。解决了限流粒度、限流阈值设定、资源利用率、Kafka集群可用性四者之间的平衡关系。
实现智能动态限流后给我们带来以下几点明显的收益。
大大提升Kafka集群服务端应对客户端流量突增的能力。
利用项目错峰的方式进一步提升Kafka集群的资源利用率。
智能化自动调整项目限流阈值无需人工介入,大大降低Kafka集群在超大数据规模场景下的运维成本。
动态根据服务端负载情况调整项目限流阈值,尽可能减小限流对业务可用性的影响。
Kafka集群元数据统一由ZooKeeper集群管理,元数据信息永久有效永不过期,元数据的下发由Kafka Controller节点统一下发,随着业务的不断发展,数据规模的不断增加,集群内部Topic的数量达到万级,分区数量达到数十万级。元数据治理能有效避免元数规模给Kafka集群稳定性带来的影响。随着接入的服务、Kafka用户越来越多,正确的使用Kafka 客户端也能大大提升Kafka服务端的稳定性和资源利用率。Kafka分区与磁盘目录绑定,创建Topic、Topic分区扩容时根据Topic流量合理设置Topic分区数能有效避免单机或单盘性能瓶颈成为集群整体的性能瓶颈。
vivo在Kafka集群治理方面实现了节点流量偏差治理、Topic元数据治理、Topic分区数据倾斜治理、Topic超大分区治理、Topic消费延迟治理等方案为Kafka集群的高可用性保驾护航。
vivo Kafka消息中间件团队在三年时间内,根据实际的业务场景和生产数据规模沉淀了较多的实践经验。例如在高可用/高可扩方面实现了机架感知、弹性伸缩、数据压缩等能力建设,在监控告警方面提供了用户限流告警、Topic流量突增告警、消费延迟告警、Leader实时监控告警,多平台联合故障感知告警等能力建设。我们为Kafka集群做了很多的扩展能力建设,那解决了Kafka集群在超大数据规模场景下的所有问题了吗?答案是否定的。
接下来我们一起看看Kafka集群在超大数据规模场景下面临的新挑战。
由Kafka架构设计所带来的一些痛点无法通过扩展能力解决,并且Kafka架构设计上分区同一副本数据与磁盘强绑定不支持分散存储、不支持存储与运算分离、不支持冷热数据分层存储等设计缺陷在超大数据规模场景下显得尤为明显。所以在超大数据规模场景下Kafka集群面临了以下几个痛点。
资源利用率低。
无法快速响应业务增长。
故障恢复时间长。
历史数据消费故障率高(主要体现在磁盘io性能上)。
基于以上Kafka在架构设计上的缺陷,vivo Kafka团队于2021年开始对另一款开源分布式消息中间件Pulsar进行调研。
我们看下Pulsar的官网定义及发展史:Pulsar 是 Apache软件基金会的顶级开源项目,是集消息、存储、轻量化函数式计算为一体的下一代云原生分布式消息流组件,采用了计算与存储分离的架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有高并发、高吞吐、低延时、高可扩,高可用等特性。
Pulsar 诞生于2012 雅虎公司内部,2016年开源交由Apache软件基金会进行孵化,2018年成为Apache软件基金会顶级开源项目。
基于Pulsar支持存算分离,分区数据分散存储、冷热数据分层存储、Broker无状态等架构设计,让Pulsar在超大数据规模场景下具备了资源利用率较高、快速响应业务增长、秒级故障恢复、实时流量均衡、支持海量数据存储等明显优势。
我们对Pulsar组件的规划分为四个阶段,包含项目启动、稳定性建设、能力进阶、稳定运营。
目前我们处在Pulsar组件稳定性建设阶段。
2022年我们的目标是打造支持日均万亿级消息处理能力的Pulsar集群,完成分层存储,监控告警一体化、KoP功能平台化等扩展能力建设。
计划2023年打造具备日均十万亿级消息处理能力的Pulsar集群,达到行业一流水准。并完成Pulsar broker容器化部署、Pulsar生态体系建设、Pulsar Sql和Pulsar Function的应用调研等扩展能力建设。
将在2024年实现日均数十万亿级消息处理能力的Pulsar集群,达到行业超一流的水准。
RocketMQ是阿里巴巴于2012年开源的低延时、高并发、高可用、高可靠的分布式消息中间件,具有海量消息堆积、高吞吐、可靠重试等特性。
RocketMQ于2012年开源,2016年进入Apache孵化,于2017年成为Apache顶级项目。
vivo中间件团队在2021年引入RocketMQ并且完成了高可用和平台化建设。
当前分别在多个机房部署了多个集群供业务使用,每日消息量数百亿。
集群分布在多个机房,每日消息量级也不低,高可用运维保障是有难度的。
为了更好的保障集群的高可用,我们采用了双机房热备的方式进行集群搭建。
我们会在两个机房进行Broker的部署,业务Topic会默认分布在两个机房,以此来保障在一个机房内的Broker节点异常时业务可以保持正常生产消费能力。
业务默认是会优先使用本机房的节点进行生产消费,只有在异常时才会自动快速完成跨机房的流量切换。
同时我们构建了一个BrokerController模块用于实现Broker节点的主从切换,以此保障集群容量的快速恢复。
双机房热备模式有哪些优势呢?
第一,消息无需跨机房复制,降低对机房专线的依赖;
第二,可以降低业务发送消息的延时,也可以提升业务的处理性能;
双机房热备模式的劣势是每个机房的节点都需要冗余一定的buffer来支撑其它机房的节点异常时自动转移过来的业务流量。
集群双机房热备部署模式是消息平台的高可用基石,在此之外我们还建设了多个平台模块来保障平台的高可靠。
如上图所示,
mq-rebalance模块用于支撑集群流量的自动负载均衡;
mq-monitor模块进行监控指标的采集并且与vivo内部的监控系统打通;
mq-recover模块主要用于业务流量的降级和恢复处理;
mq-live模块用于集群的探活。
另外我们还基于社区的connector组件建设了RabbitMQ-connector,实现了全球消息路由能力。
后续我们计划建设基于gRPC协议建设通用的消息网关实现与集群的交互,以此屏蔽不同的消息中间件选型。
主要有三点实践:
第一,RocketMQ集群配置平台化管理。RocketMQ集群含有较多的配置项,默认是通过节点文件管理的,在大规模集群运维过程中会存在维护困难的问题。
通过平台化管理可以确保集群内配置统一,节点在启动时从平台中读取到所有的配置,避免在集群部署时需要登录到机器进行配置维护,并且我们支持了集群配置的动态生效。
第二,运维操作平台化,例如Broker节点的流量摘除与挂载、Topic一键扩缩容等直接通过平台支撑,实现便捷运维。
第三,集群维护平台化,我们将集群与Broker节点的关系维护在平台中,并且在首次部署时分配Broker节点所在集群,这样在平台上就有清晰的集群信息,便于我们维护管理多套集群。
一方面,我们为每个集群都建设了如上图所示的监控大盘。
在监控大盘中有每个集群的生产消费流量、业务生产消费统计、发送耗时等信息,支撑我们快速观察集群的运行状态,方便日常巡检。
消息中间件作为在线业务请求处理链路中的关键环节,高可用尤为关键。监控大盘中的发送耗时信息是我们认为观察集群是否稳定运行的最关键的指标。
另一方面,我们对集群构建了丰富的监控告警能力。
如上图所示,我们分别对主机维度、集群维度、Topic/Group维度、客户端维度都做了监控指标埋点上报。
通过丰富的监控告警,我们可以及时发现问题并快速介入处理问题,详细的监控告警也可以帮助我们快速确认问题根源。
分别从可用性保障、性能、容量、功能特性对比RabbitMQ和RocketMQ。
可用性保障方面,RabbitMQ集群无负载均衡能力,队列流量实际由集群内某个节点承载,存在瓶颈。其次RabbitMQ存在脑裂问题,从我们的运维经验看如果出现网络故障集群通常无法自动恢复,并且可能丢失少量数据。
性能方面,RabbitMQ集群整体性能较低,并且不支持水平扩展。
容量方面,从我们的运维经验看,当消息堆积到千万后,RabbitMQ性能就会有所下降。在大量消息堆积开始消费后,因为RabbitMQ的背压流控机制,最终可能会因为集群负载过大导致发送限流甚至发送阻塞。
功能特性方面,RabbitMQ不支持消费异常延时重投递功能,也不支持消息轨迹、事务消息、顺序消息等特性。
而RocketMQ在可用性保障、性能、容量、功能特性方面相对于RabbitMQ都是更优的。
可用性保障方面,RocketMQ采用多主多从的松耦合架构部署,主从间通过同步双写保障消息的可靠性和一致性。
性能方面,Topic可以分布在多个Broker中,实现水平扩容,并且RocketMQ支持从从节点拉取消息,读写分离的设计很好的支持了业务读取冷数据的诉求。
容量方面,RocketMQ使用磁盘存储,磁盘容量就是消息的存储容量,利用从从节点拉取冷数据特性,海量消息堆积对消息写入性能基本无影响。
功能特性方面,RocketMQ支持了消息轨迹、事务消息、顺序消息等特性。
综合分析,RocketMQ可以更好的支撑互联网业务的诉求。
由于当前RabbitMQ已经有数千个服务接入,为了让业务不修改代码即可迁移到RocketMQ,我们建设了一个AMQP消息网关来实现MQ协议的解析和流量转发。
如上图所示,MQ-Proxy模块用于解析AMQP协议,代理客户端的生产消费请求。
RabbitMQ的元数据信息存在在集群中,并且与RocketMQ元数据概念存在差异,为此我们建设了MQ-Meta模块用于维护Exchange/Queue极其绑定关系等元数据信息,并且Proxy模块可以动态感知元数据变更。
另外,为了更好的支撑业务诉求,我们对AMQP协议进行了扩展,支持了局部有序和批量消费能力。
为了更好的整合RabbitMQ和RocketMQ,我们对它们的元数据进行了一一对应。
其中将RabbitMQ的Exchange映射为RocketMQ的Topic,Queue映射为Group,RoutingKey映射为消息头的一个参数,VirtualHost映射为Namespace。
为什么将RoutingKey映射为消息头的一个参数而不是Tag呢?这是因为RabbitMQ的RoutingKey是有模糊匹配过滤能力的,而RocketMQ的Tag是不支持模糊匹配的。
另外我们还通过扩展使得RocketMQ也支持了RoutingKey过滤。
在经过多轮优化后,在1KB消息体场景下,一台8C16G的机器在单发送下可支撑超过九万的TPS,单推送可以支撑超过六万TPS,性能上很好的满足了当前业务的诉求。
主要有两部分:
一方面,我们希望可以调研升级到RocketMQ5.0版本架构,RocketMQ5.0的存算分离架构可以更好的解决我们当前遇到的存储瓶颈问题,Pop消费可以帮助我们实现更好的消费负载均衡。
我们还希望可以基于gRPC协议建设统一的消息网关能力。
另一方面,我们希望可以探索消息中间件容器化部署,提供消息中间件的快速弹性扩缩容能力,更好的支持业务需求。
回顾vivo消息中间件演进历史,我们完成了在线业务消息中间件从RabbitMQ迁移到RocketMQ,大数据消息中间件正在从kafka演进为使用pulsar支撑。
我们理解消息中间件将继续朝着云原生演进,满足业务快速增长的诉求,充分利用云的优势为业务提供极致的体验。
很好奇,就使用rubyonrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提
我是rails的新手,想在form字段上应用验证。myviewsnew.html.erb.....模拟.rbclassSimulation{:in=>1..25,:message=>'Therowmustbebetween1and25'}end模拟Controller.rbclassSimulationsController我想检查模型类中row字段的整数范围,如果不在范围内则返回错误信息。我可以检查上面代码的范围,但无法返回错误消息提前致谢 最佳答案 关键是您使用的是模型表单,一种显示ActiveRecord模型实例属性的表单。c
我的工作要求我为某些测试自动生成电子邮件。我一直在四处寻找,但未能找到可以快速实现的合理解决方案。它需要在outlook而不是其他邮件服务器中,因为我们有一些奇怪的身份验证规则,我们需要保存草稿而不是仅仅发送邮件的选项。显然win32ole可以做到这一点,但我找不到任何相当简单的例子。 最佳答案 假设存储了Outlook凭据并且您设置为自动登录到Outlook,WIN32OLE可以很好地完成此操作:require'win32ole'outlook=WIN32OLE.new('Outlook.Application')message=
导读:随着叮咚买菜业务的发展,不同的业务场景对数据分析提出了不同的需求,他们希望引入一款实时OLAP数据库,构建一个灵活的多维实时查询和分析的平台,统一数据的接入和查询方案,解决各业务线对数据高效实时查询和精细化运营的需求。经过调研选型,最终引入ApacheDoris作为最终的OLAP分析引擎,Doris作为核心的OLAP引擎支持复杂地分析操作、提供多维的数据视图,在叮咚买菜数十个业务场景中广泛应用。作者|叮咚买菜资深数据工程师韩青叮咚买菜创立于2017年5月,是一家专注美好食物的创业公司。叮咚买菜专注吃的事业,为满足更多人“想吃什么”而努力,通过美好食材的供应、美好滋味的开发以及美食品牌的孵
我正在使用Ruby,我正在与一个网络端点通信,该端点在发送消息本身之前需要格式化“header”。header中的第一个字段必须是消息长度,它被定义为网络字节顺序中的2二进制字节消息长度。比如我的消息长度是1024。如何将1024表示为二进制双字节? 最佳答案 Ruby(以及Perl和Python等)中字节整理的标准工具是pack和unpack。ruby的packisinArray.您的长度应该是两个字节长,并且按网络字节顺序排列,这听起来像是n格式说明符的工作:n|Integer|16-bitunsigned,network(bi
如果我在模型中设置验证消息validates:name,:presence=>{:message=>'Thenamecantbeblank.'}我如何让该消息显示在闪光警报中,这是我迄今为止尝试过的方法defcreate@message=Message.new(params[:message])if@message.valid?ContactMailer.send_mail(@message).deliverredirect_to(root_path,:notice=>"Thanksforyourmessage,Iwillbeintouchsoon")elseflash[:error]
RSpec似乎按顺序匹配方法接收的消息。我不确定如何使以下代码工作:allow(a).toreceive(:f)expect(a).toreceive(:f).with(2)a.f(1)a.f(2)a.f(3)我问的原因是a.f的一些调用是由我的代码的上层控制的,所以我不能对这些方法调用添加期望。 最佳答案 RSpecspy是测试这种情况的一种方式。要监视一个方法,用allowstub,除了方法名称之外没有任何约束,调用该方法,然后expect确切的方法调用。例如:allow(a).toreceive(:f)a.f(2)a.f(1)
我认为我的问题最好用一个例子来描述。假设我有一个名为“Thing”的简单模型,它有一些简单数据类型的属性。像...Thing-foo:string-goo:string-bar:int这并不难。数据库表将包含具有这三个属性的三列,我可以使用@thing.foo或@thing.bar之类的东西访问它们。但我要解决的问题是当“foo”或“goo”不再包含在简单数据类型中时会发生什么?假设foo和goo代表相同类型的对象。也就是说,它们都是“Whazit”的实例,只是数据不同。所以现在事情可能看起来像这样......Thing-bar:int但是现在有一个新的模型叫做“Whazit”,看起来
我有一个要在我的Rails3项目中使用的数组扩展方法。它应该住在哪里?我有一个应用程序/类,我最初把它放在(array_extensions.rb)中,在我的config/application.rb中我加载路径:config.autoload_paths+=%W(#{Rails.root}/应用程序/类)。但是,当我转到railsconsole时,未加载扩展。是否有一个预定义的位置可以放置我的Rails3扩展方法?或者,一种预先定义的方式来添加它们?我知道Rails有自己的数组扩展方法。我应该将我的添加到active_support/core_ext/array/conversion
我以为它们存储在cookie中-但不,检查cookie没有任何结果。session也不存储它们。那么,我在哪里可以找到它们?我需要这个来直接设置它们(而不是通过flashhash)。 最佳答案 它们存储在inyoursessionstore.自rails2.0以来的默认设置是cookie存储,但请检查config/initializers/session_store.rb以检查您是否使用默认设置以外的东西。 关于ruby-on-rails-闪存消息存储在哪里?,我们在StackOverf