【写在前面】
消息队列中间件(简称消息中间件)是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。
它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,是异步RPC的主要手段之一。
作为分布式系统架构中的一个重要组件,消息中间件的选型,变成了一个炙手可热的话题。
【消息队列的种类】
现在,市面上的消息队列种类可谓是种类繁多,主流的消息队列差不多有以下几种:
由LinkedIn公司采用Scala语言开发的一个分布式、多分区、多副本,基于zookeeper协调的分布式消息系统「现在最新版本的Kafka已经可以不依赖ZK独立运行」,现已捐献给Apache基金会。
它是一种高吞吐量的分布式发布订阅消息系统,以可水平扩展和高吞吐率而被广泛使用。
目前很多的开源分布式处理系统如Cloudera、Apache Storm、Spark、Flink等都支持与Kafka集成。
采用Erlang语言实现的AMQP协议的消息中间件,最初起源于金融系统,用于在分布式系统中存储转发消息。
使用场景非常丰富,被广泛应用于云相关的架构中,如:OpenStack。
阿里出品,已经捐献个Apache基金会。
Java编写,具备高吞吐量、高可用性、适合大规模分布式系统应用等特点。
Apache出品,使用Java编写,市场份额并不高,最新架构被命名为Apollo。
号称史上最快的消息队列,基于C语言开发。
可在多线程、多内核和主机之间弹性伸缩,虽然大多数时候我们习惯将其归类于消息队列,但是其和前面的几种有着本质的区别,ZeroMQ本身就不是一个消息队列服务器,更像是一组底层网络通讯库,对原有的Socket API上加了一层封装。
【几种特殊的消息队列】
普通的消息队列,一般是先进先出,但是有些时候,需要将部分重要的消息后进先出,优先级队列就起作用了。
优先级队列主要用于消息积压时,优先级较高的系统的消息需要被更早消费的场景。
对于一部分场景,并不希望消费者可以立即拿到消息尽心消费,而是等待一段时间再进行消费,对队列进行延迟消费,比如:
服务器中,有很多客户端的连接,空闲一段时间之后需要关闭之。
比如缓存中的对象,超过了空闲时间,需要从缓存中移出。
在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求。
下单之后如果三十分钟之内没有付款就自动取消订单。
对于延时任务的实现方式,一般会有两种:
由于某些原因消息无法被正确的投递,为了确保消息不会被无故的丢弃,一般将其置于一个特殊角色的队列,这个队列一般称之为死信队列。
变成死信队列一般有以下几种情况:
重试队列可以看做是一种特殊的回退队列。
在消费端进行消费的时候,为防止消费失败导致消息直接被未执行而丢弃,所以在消费失败后重新将其加入到队列中,等待消费者再次消费。
很多重试队列都会设置间隔时间的策略,可以设置每隔10s重试一次,也可以设置第一次隔5s,第二次隔10s,第三次隔30s,避免系统的短暂故障导致队列重试次数快速增长。
重试越多次重新投递的时间就越久,为此需要设置一个上限,超过投递次数就入死信队列。
【消息模式】
消费模式分为推(push)模式和拉(pull)模式。
消息队列一般有两种传递模式:
一条消息只会被一个消费者消费,消费以后队列中将不会继续存储词条消息。
这种模式下,有一个核心的概念叫做Topic,生产者发布携带某个Topic的消息,消费者消费携带某个Topic的消息。
Topic的存在,解耦了生产者和消费者,不需要进行相互接触即可成功进行生产和消费。
RabbitMQ是一种典型的点对点模式,而Kafka是一种典型的发布订阅模式。
但是RabbitMQ中可以通过设置交换器类型来实现发布订阅模式而达到广播消费的效果,Kafka中,也可以通过设置消费者组实现点对点的效果。
【消息存储】
一般消息在消费完成后就被注销了。
消息回溯指的是,在消息被消费之后,依旧能再次消费到这个消息。消息回溯的主要目的是复现旧场景。
此外,很多其他功能也会依赖消息回溯,比如:索引恢复,本地缓存重建,补偿机制等。
消息堆积功能,直接决定了消息是否支持削峰。
消费者的能力都是有限的,当大量消息被生产的时候,如果没有消息堆积,消息队列可能会直接崩溃。
消息堆积也有两种实现方式:
在分布式系统中,消息追踪的功能非常重要,有了消息追踪,我们可以做到对消息的全链路跟踪,进而对定位问题有很好的帮助。
【附加功能】
流量控制(flow control)针对的是发送方和接收方速度不匹配的问题,提供一种速度匹配服务抑制发送速率使接收方应用程序的读取速率与之相适应。通常的流控方法有Stop-and-wait、滑动窗口以及令牌桶等。
保证消息有序。这个功能有个很常见的应用场景就是CDC(Change Data Chapture),以MySQL为例,如果其传输的binlog的顺序出错,比如原本是先对一条数据加1,然后再乘以2,发送错序之后就变成了先乘以2后加1了,造成了数据不一致。
确保消息在生产者和消费者之间进行传输而言一般有三种传输保障(delivery guarantee):
对于大多数消息中间件而言,一般只提供At most once和At least once两种传输保障,对于第三种一般很难做到,由此消息幂等性也很难保证。
事务对于使用过数据库的同学来讲并不陌生,要么全部成功,要么全部失败。
而对于消息中间件来讲,事务概念指的是要么发送成功,要么发送失败。
【协同和隔离】
在Kafka 0.9版本之后增加了身份认证和权限控制两种安全机制。
身份认证是指客户端与服务端连接进行身份认证,包括客户端与Broker之间、Broker与Broker之间、Broker与ZooKeeper之间的连接认证。
目前支持SSL、SASL等认证机制。
权限控制是指对客户端的读写操作进行权限控制,包括对消息或Kafka集群操作权限控制。
权限控制是可插拔的,并支持与外部的授权服务进行集成。
对于RabbitMQ而言,其同样提供身份认证(TLS/SSL、SASL)和权限控制(读写操作)的安全机制。
在软件架构设计中,很多时候我们需要考虑到多用户环境下,不同用户之间数据的相互隔离。
RabbitMQ支持多租户技术, 每个租户叫一个vhost。
一般消息层面的协议有AMQP、MQTT、STOMP、XMPP等(消息领域中的JMS更多的是一个规范而不是一个协议),支持的协议越多其应用范围就会越广,通用性越强。
有的消息协议是基于本身的私有协议运转的。「Kafka」
支持多种语言,也在一定程度上决定了消息队列是适用性。
其实,现在很多的中间件,都在向语言无关的方向发展,其实也是一种趋势。
【比较】
下图对市面上流行的消息中渐渐做了一个比较:
|
功能 |
Kafka |
RabbitMQ |
ZeroMQ |
RocketMQ |
ActiveMQ |
|
支持的协议 |
自己定义的一套(基于TCP的协议) |
AMQP |
TCP/UDP |
自己定义的一套 |
OpenWire、STOMP、REST、XMPP、AMQP |
|
存储方式 |
内存、磁盘、数据库。支持大量堆积。 |
内存、磁盘。支持少量堆积。 |
消息发送端的内存或者磁盘中。不支持持久化。 |
磁盘。支持大量堆积。 |
内存、磁盘、数据库。支持少量堆积。 |
|
事务支持 |
支持 |
支持,使用事务会使性能有所下降 |
不支持 |
支持 |
支持 |
|
负载均衡 |
支持负载均衡 |
对负载均衡的支持不好 |
去中心化,不支持负载均衡,本身只是一个多线程网络库 |
支持负载均衡 |
支持负载均衡,需要通过集成zookeeper来实现负载均衡 |
|
集群 |
天然的 Leader-Slave 无状态集群,每台服务器既是Master也是Slave。 |
支持简单集群,复制模式,对高级集群模式支持不好。 |
去中心化,不支持集群。 |
常用多对 Master-Slave 模式,开源版本需手动切换Slave变成Master。集群的slave会从master拉取数据备份,master分布在不同的broker上。 |
支持简单集群模式,比如'主-备',对高级集群模式支持不好。 |
|
高可用 |
非常高(分布式) |
高(主从) |
高 |
非常高(分布式) |
高(主从) |
|
消息重复 |
支持at least once、at most once |
支持at least once、at most once |
既不支持at least once、也不支持at most once、更不支持exactly only once |
支持at least once |
支持at least once |
|
吞吐量(TPS) |
灰常大,Kafka 按批次发送消息和消费消息 |
比较大 |
非常大 |
大,rocketMQ 接收端可以批量消费消息,可以配置每次消费的消息数,但是发送端不是批量发送。 |
比较大 |
|
消息订阅与发布 |
基于topic以及按照topic进行正则匹配的发布订阅模式 |
提供了4种:direct, topic ,Headers和fanout |
点对点(P2P) |
基于topic/messageTag以及按照消息类型、属性进行正则匹配的发布订阅模式 |
点对点(P2P)、广播(发布-订阅) |
|
消息顺序支持 |
支持 |
不支持 |
不支持 |
支持 |
不支持 |
|
消息确认支持 |
支持 |
支持 |
支持 |
支持 |
支持 |
|
消息回溯支持 |
支持指定分区offset位置的回溯 |
不支持 |
不支持 |
支持指定时间点的回溯 |
不支持 |
|
消息重试支持 |
不支持,但是可以实现 |
不支持,但是可以利用消息确认机制实现 |
不支持 |
支持 |
不支持 |
|
并发度 |
高 |
极高 |
高 |
高 |
高 |
|
管理界面 |
一般 |
好 |
无 |
无 |
一般 |
【性能】
对于消息队列来讲,性能的重要性,有时候比功能还要高。
从上面那张表中我们可以看出,总体来讲,RabbitMQ在功能的健全性上面,是要比Kafka更强的,但是Kafka的吞吐量,是比RabbitMQ高出数十倍的。一般来讲,单机RabbitMQ的QPS在万级别以内,但是Kafka的单机QPS,号称可以达到百万级。
|
注:吞吐量是会受限于硬件设施的,如:网络带宽,磁盘读写速度等。 |
作为性能的一大考察点,时延一般会被人们所忽视,原因在于:对时延有要求的系统,一般都不会使用消息队列去进行系统间通信,而是直接通过RPC调用去实现。消息中间件的消息堆积能力,就注定了消息是可接受时延的。
【可靠性】
对于消息中间件而言,每个消息中间件都存在消息丢失的问题。
对于Kafka而言,是通过ISR协议来保证消息的可靠:
|
在zk中会保存AR(Assigned Replicas)列表,其中包含了分区所有的副本,其中 AR = ISR+OSR ISR(in sync replica) 是kafka动态维护的一组同步副本。 在ISR中有成员存活时,只有这个组的成员才可以成为leader,内部保存的为每次提交信息时必须同步的副本(acks = all时),每当leader挂掉时,在ISR集合中选举出一个follower作为leader提供服务,当ISR中的副本被认为坏掉的时候,会被踢出ISR,当重新跟上leader的消息数据时,重新进入ISR。 OSR(out sync replica) 保存的副本不必保证必须同步完成才进行确认。 OSR内的副本是否同步了leader的数据,不影响数据的提交,OSR内的follower尽力的去同步leader,可能数据版本会落后。 |
而对于RabbitMQ而言,需要有两个层面的保障:
如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息。
但是问题在于,事务机制太过于损耗性能,所以一般还可以采用下面这种方式。
在生产者开启confirm模式之后,每次写的消息都会分配一个唯一的id,如果写入成功,则会回传一个ack消息,如果没能处理,则会回调一个nack接口,然后生产者即会重试。
在这个机制下,可以自己在内存中维护每个消息id的状态,如果超出一定时间还没有收到此消息的回调,就可以重发。
需要开启RabbitMQ的持久化,在消息写入之后会持久化到磁盘,哪怕是RabbitMQ自己挂了,恢复之后,会自动从磁盘读取之间的数据。
这样的话,就和Kafka很类似了,Kafka就是把消息直接写入到磁盘的。
【管理和运维】
在消息中间件的使用过程中,不可避免地会出现各种各样的问题,比如前文中提到的消息的丢失,性能的监控,横向扩容,日常告警,灰度发布,灾备处理等。
所以我们在选用一个消息中间件的时候,还需要考虑到这些扩展能力是否够用,如果不够用,是否支持定制化开发,扩展插件等等。
这些非核心能力点对于用户来讲也是非常重要的,而对于这些功能点来讲,在很大程度上其实取决于社区的活跃度和生态的发展,所以,在做技术选型的时候,最好选用当下比较热门以及有大公司做背书的技术,这样后续的版本更新和漏洞修复都有保障。
【自研】
对于很多大公司而言,为了满足自己个性化的需求,一般都会采用先自研,后开源的方式,比如LinkedIn的Kafka,阿里的RocketMQ等等,其实当时市面上已经有相对好用的产品如:ActiveMQ,RabbitMQ,但是每个公司的设计理念和产品需求都是不一样的,所以,对于大厂而言,自研也是一条不错的路。
但是无论是采用市面上开源的产品去进行定制化开发,还是自己去从0开始平地起高楼,我们都要先问自己几个问题:
我们是否真的需要自研?无论是消费模式,还是消息存储,或者是协议的支持,市面上的产品是不是真的不满足我们的要求?
若要自研,我们的具体场景是什么?采用什么语言设计?若采用Java,可不可以,对Java中的ArrayBlockingQueue做一个简单的封装,或者基于文件、数据库、Redis等底层存储封装而形成一个消息中间件。亦或者,根据Kafka或者RabbitMQ的可扩展性,来选择性的扩展。
对于JD而言,其实已经有了众多的消息队列产品如:***,***,***,***等等,品类繁多,尚未开源。
每个产品都有每个产品适应的场景,但是对于同一家企业来讲,难免有重复造轮子的嫌疑,笔者也曾遍历了以上所述的这些所有产品,其实在基本功能上,相似度是很高的,所以有一个大胆的想法,我们可以集中开发投入做一个基本的消息队列系统,然后在扩展性上做一些响应的侧重,这样,每个产品在自己的个性化需求上面可以有自己的设计,但是都使用同一个内核,这样是否可以在降本增效上面更进一步?都是我们需要考虑的问题。
我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div
总的来说,我对ruby还比较陌生,我正在为我正在创建的对象编写一些rspec测试用例。许多测试用例都非常基础,我只是想确保正确填充和返回值。我想知道是否有办法使用循环结构来执行此操作。不必为我要测试的每个方法都设置一个assertEquals。例如:describeitem,"TestingtheItem"doit"willhaveanullvaluetostart"doitem=Item.new#HereIcoulddotheitem.name.shouldbe_nil#thenIcoulddoitem.category.shouldbe_nilendend但我想要一些方法来使用
很好奇,就使用rubyonrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提
关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。
给定这段代码defcreate@upgrades=User.update_all(["role=?","upgraded"],:id=>params[:upgrade])redirect_toadmin_upgrades_path,:notice=>"Successfullyupgradeduser."end我如何在该操作中实际验证它们是否已保存或未重定向到适当的页面和消息? 最佳答案 在Rails3中,update_all不返回任何有意义的信息,除了已更新的记录数(这可能取决于您的DBMS是否返回该信息)。http://ar.ru
在控制台中反复尝试之后,我想到了这种方法,可以按发生日期对类似activerecord的(Mongoid)对象进行分组。我不确定这是完成此任务的最佳方法,但它确实有效。有没有人有更好的建议,或者这是一个很好的方法?#eventsisanarrayofactiverecord-likeobjectsthatincludeatimeattributeevents.map{|event|#converteventsarrayintoanarrayofhasheswiththedayofthemonthandtheevent{:number=>event.time.day,:event=>ev
我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t
我正在寻找执行以下操作的正确语法(在Perl、Shell或Ruby中):#variabletoaccessthedatalinesappendedasafileEND_OF_SCRIPT_MARKERrawdatastartshereanditcontinues. 最佳答案 Perl用__DATA__做这个:#!/usr/bin/perlusestrict;usewarnings;while(){print;}__DATA__Texttoprintgoeshere 关于ruby-如何将脚
Rackup通过Rack的默认处理程序成功运行任何Rack应用程序。例如:classRackAppdefcall(environment)['200',{'Content-Type'=>'text/html'},["Helloworld"]]endendrunRackApp.new但是当最后一行更改为使用Rack的内置CGI处理程序时,rackup给出“NoMethodErrorat/undefinedmethod`call'fornil:NilClass”:Rack::Handler::CGI.runRackApp.newRack的其他内置处理程序也提出了同样的反对意见。例如Rack
在选择我想要运行操作的频率时,唯一的选项是“每天”、“每小时”和“每10分钟”。谢谢!我想为我的Rails3.1应用程序运行调度程序。 最佳答案 这不是一个优雅的解决方案,但您可以安排它每天运行,并在实际开始工作之前检查日期是否为当月的第一天。 关于ruby-如何每月在Heroku运行一次Scheduler插件?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/8692687/