RabbitMQ使用过程中,有些业务场景需要我们保证顺序消费,例如:业务上产生三条消息,分别是对数据的增加、修改、删除操作,如果没有保证顺序消费,执行顺序可能变成删除、修改、增加,这就乱了 。

RabbitMQ的消息顺序问题,需要分三个环节看待,发送消息的顺序、队列中消息的顺序、消费消息的顺序。
先看一下是什么原因造成了发送消息时候的顺序错乱
发送消息的顺序性,一般来说不做要求,但是如果一定要求顺序,可以使用锁机制配合 ack机制 来保证消息的顺序到达
消息队列中的消息是遵循FIFO(先进先出)原则,天然有序
有这样一个订单操作,insert 、update、delete连续操作,并且消息已经顺序存在queue中,那么如何保证消费顺序是insert 、update、delete,而不是delete、insert 、update呢?
方案一:拆分多个queue,每个queue一个consumer,该条订单的相关操作全部放到这个queue中,由这一个consumer消费,这样做多了一些queue。
方案二:就一个queue但是对应一个consumer,然后这个consumer内部用内存队列做排队,该条订单相关的消息全部放到一个队列中,然后分发给底层不同的worker线程来处理
实际上消息队列是没法百分百保证不丢失的,我们只能尽量降低概率,然后在消息丢失后记录日志,再处理
有这样一个典型的订单场景

要解决上述问题,就是要保证消息一定要可靠地被消费,那么我们可以来分析下消息有哪些步骤会出问题
RabbitMQ 发送的消息是这样的 , 消息被生产者发到指定的交换机根据路由规则路由到绑定的队列,然后推送给消费者 。

在这个过程中,可能会出一下问题
下面是单消费实例的解决方案
多实例的先留个坑,以后再填
RabbitMQ 提供了确认和回退机制,有一个异步监听机制,每次发送消息,如果成功/未成功发送到交换机都可以触发一个监听ConfirmCallback(),从交换机路由到队列失败也会有一个监听ReturnsCallback()。只需要开启这两个监听机制,使用记录日志、发送邮件通知、落库定时任务扫描重发这些应对策略
生产者弄丢数据其实及其罕见,落库定时任务扫描重发工作量大,一般记录日志后,发邮件给对应人员,补充数据库数据即可
宕机重启不开启持久化的情况下 RabbitMQ 重启之后所有队列和消息都会消失,所以我们创建队列时设置持久化
RabbitMQ 给我们提供了消费者应答(ack)机制,默认情况下这个机制是自动应答,只要消息推送到消费者就会自动 ack ,然后 RabbitMQ 删除队列中的消息。启用手动应答之后我们在消费端调用 API 手动 ack 确认之后,RabbitMQ 才会从队列删除这条消息 。
开启手动ack,在业务处理完成之后手动ack即可,如果在业务处理过程中出异常了,队列会给消费者重推,也要注意重推导致的循环异常,可以配置重试次数策略。
这个也是生产环境业务中经常出现的场景,重复消费也要从两方面分析,为什么会出现重复消费
在网络波动的情况下,生产者给MQ服务器发送消息,由于网络原因导致生产者没有收到ACK确认消息,但是MQ服务器实际上已经接收到了消息,在这种情况下生产者就会重新发送一遍刚才的消息。
此时重发是MQ-client发起的,消息的处理是MQ-server,为了避免broker落地重复的消息,对每条消息,MQ系统内部必须生成一个inner-msg-id,作为去重和幂等的依据,这个内部消息ID的特性是:
有了这个inner-msg-id,就能保证即使重发,也只有1条消息落到MQ-server的DB中
在消费者方面如果出现网络问题,比如消费者对消息已经成功消费了,在向MQ服务器进行确认的时候网络异常了,这时候MQ服务器就没有接收到确认,MQ为了保证消息被消费,就会继续向消费者发送之前已经被消费了的消息,这种情况下消费者就会接收到两条一样的消息。
我们解决消息重复消费主要是保证消费的幂等性,有两种角度,第一种就是不让消费端执行两次,第二种是让它重复消费了,但是不会对我的业务数据造成影响就行。通常可以在发消息的时候携带业务唯一id,消费成功后保存到redis/db中,消费前再检查下有没有这个ID,有的话就表示已经消费过了,或者使用数据库唯一性主键约束,再或者使用cas,最后遇到重复消息丢弃消息即可
在MRIRuby中我可以这样做:deftransferinternal_server=self.init_serverpid=forkdointernal_server.runend#Maketheserverprocessrunindependently.Process.detach(pid)internal_client=self.init_client#Dootherstuffwithconnectingtointernal_server...internal_client.post('somedata')ensure#KillserverProcess.kill('KILL',
我遵循了教程http://gettingstartedwithchef.com/,第1章。我的运行list是"run_list":["recipe[apt]","recipe[phpap]"]我的phpapRecipe默认Recipeinclude_recipe"apache2"include_recipe"build-essential"include_recipe"openssl"include_recipe"mysql::client"include_recipe"mysql::server"include_recipe"php"include_recipe"php::modul
我是rails的新手,想在form字段上应用验证。myviewsnew.html.erb.....模拟.rbclassSimulation{:in=>1..25,:message=>'Therowmustbebetween1and25'}end模拟Controller.rbclassSimulationsController我想检查模型类中row字段的整数范围,如果不在范围内则返回错误信息。我可以检查上面代码的范围,但无法返回错误消息提前致谢 最佳答案 关键是您使用的是模型表单,一种显示ActiveRecord模型实例属性的表单。c
我的工作要求我为某些测试自动生成电子邮件。我一直在四处寻找,但未能找到可以快速实现的合理解决方案。它需要在outlook而不是其他邮件服务器中,因为我们有一些奇怪的身份验证规则,我们需要保存草稿而不是仅仅发送邮件的选项。显然win32ole可以做到这一点,但我找不到任何相当简单的例子。 最佳答案 假设存储了Outlook凭据并且您设置为自动登录到Outlook,WIN32OLE可以很好地完成此操作:require'win32ole'outlook=WIN32OLE.new('Outlook.Application')message=
我正在使用Ruby,我正在与一个网络端点通信,该端点在发送消息本身之前需要格式化“header”。header中的第一个字段必须是消息长度,它被定义为网络字节顺序中的2二进制字节消息长度。比如我的消息长度是1024。如何将1024表示为二进制双字节? 最佳答案 Ruby(以及Perl和Python等)中字节整理的标准工具是pack和unpack。ruby的packisinArray.您的长度应该是两个字节长,并且按网络字节顺序排列,这听起来像是n格式说明符的工作:n|Integer|16-bitunsigned,network(bi
如果我在模型中设置验证消息validates:name,:presence=>{:message=>'Thenamecantbeblank.'}我如何让该消息显示在闪光警报中,这是我迄今为止尝试过的方法defcreate@message=Message.new(params[:message])if@message.valid?ContactMailer.send_mail(@message).deliverredirect_to(root_path,:notice=>"Thanksforyourmessage,Iwillbeintouchsoon")elseflash[:error]
RSpec似乎按顺序匹配方法接收的消息。我不确定如何使以下代码工作:allow(a).toreceive(:f)expect(a).toreceive(:f).with(2)a.f(1)a.f(2)a.f(3)我问的原因是a.f的一些调用是由我的代码的上层控制的,所以我不能对这些方法调用添加期望。 最佳答案 RSpecspy是测试这种情况的一种方式。要监视一个方法,用allowstub,除了方法名称之外没有任何约束,调用该方法,然后expect确切的方法调用。例如:allow(a).toreceive(:f)a.f(2)a.f(1)
一段时间以来,我一直在使用open_uri下拉ftp路径作为数据源,但突然发现我几乎连续不断地收到“530抱歉,允许的最大客户端数(95)已经连接。”我不确定我的代码是否有问题,或者是否是其他人在访问服务器,不幸的是,我无法真正确定谁有问题。本质上,我正在读取FTPURI:defself.read_uri(uri)beginuri=open(uri).readuri=="Error"?nil:urirescueOpenURI::HTTPErrornilendend我猜我需要在这里添加一些额外的错误处理代码...我想确保我采取一切预防措施来关闭所有连接,这样我的连接就不是问题所在,但是我
我正在构建一个小部件来显示奥运会的奖牌数。我有一个“国家”对象的集合,其中每个对象都有一个“名称”属性,以及奖牌计数的“金”、“银”、“铜”。列表应该排序:1.首先是奖牌总数2.如果奖牌相同,按类型分割(金>银>铜,即2金>1金+1银)3.如果奖牌和类型相同,则按字母顺序子排序我正在用ruby做这件事,但我想语言并不重要。我确实找到了一个解决方案,但如果感觉必须有更优雅的方法来实现它。这是我做的:使用加权奖牌总数创建一个虚拟属性。因此,如果他们有2个金牌和1个银牌,加权总数将为“3.020100”。1金1银1铜为“3.010101”由于我们希望将奖牌数排序为最高的,因此列表按降序排
这就是我做的a="%span.rockets#diamonds.ribbons.forever"a=a.match(/(^\%\w+)([\.|\#]\w+)+/)putsa.inspect这是我得到的#这就是我想要的#帮助?我尝试过但失败了:( 最佳答案 通常,您不能获得任意数量的捕获组,但如果您使用扫描,您可以为您想要捕获的每个标记获得一个匹配:a="%span.rockets#diamonds.ribbons.forever"a=a.scan(/^%\w+|\G[.|#]\w+/)putsa.inspect["%span","