文章目录
RocketMQ 的事务消息,是指Producer端消息发送事件和本地事务事件,同时成功或同时失败
RocketMQ实现事务主要分为两个阶段: 正常事务的发送及提交、事务信息的补偿流程(都是针对生产者 因为事务只出现在DataBase中 有些情况需要将消息存储在数据库中 如果发生事务问题…)
整体流程为:
- 正常事务发送与提交阶段
- 生产者发送一个半消息给broker(半消息是指的暂时不能消费的消息)
- 服务端响应
- 开始执行本地事务
- 根据本地事务的执行情况执行Commit或者Rollback
- 事务信息的补偿流程
- 如果broker长时间没有收到本地事务的执行状态,会向生产者发起一个确认会查的操作请求
- 生产者收到确认会查请求后,检查本地事务的执行状态
- 根据检查后的结果执行Commit或者Rollback操作 补偿阶段主要是用于解决生产者在发送Commit或者Rollbacke操作时发生超时或失败的情况

事务消息相对普通消息最大的特点就是一阶段发送的消息对用户是不可见的,也就是说消费者不能直接消费.这里RocketMQ实现方法是原消息的主题与消息消费队列,然后把主题改成RMQ_SYS_TRANS_HALF_TOPIC.这样由于消费者没有订阅这个主题,所以不会消费.
在本地事务执行完成后回向Broker发送Commit或者Rollback操作,此时如果在发送消息的时候生产者出故障了,要保证这条消息最终被消费,broker就会向服务端发送回查请求,确认本地事务的执行状态.当然RocketMQ并不会无休止的发送事务状态回查请求,默认是15次,如果15次回查还是无法得知事务的状态,RocketMQ默认回滚消息(broker就会将这条半消息删除)
TransactionStatus.CommitTransaction:提交事务消息,消费者可以消费此消息
TransactionStatus.RollbackTransaction:回滚事务,它代表该消息将被删除,不允许被消费。
TransactionStatus.Unknown :中间状态,它代表需要检查消息队列来确定状态。
使用
创建生产者时我们不在简单地创建DefaultMQProducer 而是RocketMQ事务专属的 TransactionMQProducer 并且不再简单地发送消息了 而是设置一个事务监听器 setTransactionListener(new TransactionListener(){…}); 实现接口方法 并且由于监听器需要等待本地事务的执行情况我们不能再生产者发送完消息后关闭
Producer
/**
* 事务消息生产者
*/
public class TransactionMessageProducer {
/**
* 事务消息监听实现
*/
private final static TransactionListener transactionListenerImpl = new TransactionListener() {
/**
* 在发送消息成功时执行本地事务
* @param msg
* @param arg producer.sendMessageInTransaction的第二个参数
* @return 返回事务状态
* LocalTransactionState.COMMIT_MESSAGE:提交事务,提交后broker才允许消费者使用
* LocalTransactionState.RollbackTransaction:回滚事务,回滚后消息将被删除,并且不允许别消费
* LocalTransactionState.Unknown:中间状态,表示MQ需要核对,以确定状态
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// TODO 开启本地事务(实际就是我们的jdbc操作)
// TODO 执行业务代码(插入订单数据库表)
// int i = orderDatabaseService.insert(....)
// TODO 提交或回滚本地事务(如果用spring事务注解,这些都不需要我们手工去操作)
// 模拟一个处理结果
int index = 8;
/**
* 模拟返回事务状态
*/
switch (index) {
case 3:
System.out.printf("本地事务回滚,回滚消息,id:%s%n", msg.getKeys());
return LocalTransactionState.ROLLBACK_MESSAGE;
case 5:
case 8:
return LocalTransactionState.UNKNOW;
default:
System.out.println("事务提交,消息正常处理");
return LocalTransactionState.COMMIT_MESSAGE;
}
}
/**
* Broker端对未确定状态的消息发起回查,将消息发送到对应的Producer端(同一个Group的Producer),
* 由Producer根据消息来检查本地事务的状态,进而执行Commit或者Rollback
* @param msg
* @return 返回事务状态
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 根据业务,正确处理: 订单场景,只要数据库有了这条记录,消息应该被commit
String transactionId = msg.getTransactionId();
String key = msg.getKeys();
System.out.printf("回查事务状态 key:%-5s msgId:%-10s transactionId:%-10s %n", key, msg.getMsgId(), transactionId);
if ("id_5".equals(key)) { // 刚刚测试的10条消息, 把id_5这条消息提交,其他的全部回滚。
System.out.printf("回查到本地事务已提交,提交消息,id:%s%n", msg.getKeys());
return LocalTransactionState.COMMIT_MESSAGE;
} else {
System.out.printf("未查到本地事务状态,回滚消息,id:%s%n", msg.getKeys());
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
};
public static void main(String[] args) throws MQClientException, IOException {
// 1. 创建事务生产者对象
// 和普通消息生产者有所区别,这里使用的是TransactionMQProducer
TransactionMQProducer producer = new TransactionMQProducer("GROUP_TEST");
// 2. 设置NameServer的地址,如果设置了环境变量NAMESRV_ADDR,可以省略此步
producer.setNamesrvAddr("192.168.100.242:9876");
// 3. 设置事务监听器
producer.setTransactionListener(transactionListenerImpl);
// 4. 启动生产者
producer.start();
for (int i = 0; i < 10; i++) {
String content = "Hello transaction message " + i;
Message message = new Message("TopicTest", "TagA", "id_" + i, content.getBytes(RemotingHelper.DEFAULT_CHARSET));
// 5. 发送消息(发送一条新订单生成的通知)
SendResult result = producer.sendMessageInTransaction(message, i);
System.out.printf("发送结果:%s%n", result);
}
System.in.read();
// 6. 停止生产者
producer.shutdown();
}
}
Consumer 整个事务消息环节与Consumer相关不大,所以不用对原来的Consumer进行修改 正常接收消息即可.
- 事务消息不支持定时和批量
- 为了避免一个消息被多次检查,导致半数队列消息堆积,RocketMQ限制了单个消息的默认检查次数为15次,通过修改broker配置文件中的transactionCheckMax参数进行调整
- 特定的时间段之后才检查事务,通过broker配置文件参数transactionTimeout或用户配置CHECK_IMMUNITY_TIME_IN_SECONDS调整时间
- 一个事务消息可能被检查或消费多次
- 提交过的消息重新放到用户目标主题可能会失败
- 事务消息的生产者ID不能与其他类型消息的生产者ID共享
分布式事务,是一个在每个微服务项目中都绕不开的问题。常见的解决分案有通过Redis、zk、mq、seata等方式处理。这篇博文全面的分析一下RocketMq中事务消息的机制。
事务的经典场景有很多,如银行转账、订单库存等等。相对于分布式事务来说,订单系统和库存系统间的事务场景更为形象。如:用户操作下单,我们首先需要生成一条订单信息,然后库存系统需要针对订单中的商品进行库存扣减的操作。这两步操作必须保证数据的一致性,否则会出现库存超扣等情况。
第一种情况如图所示,在本地事务提交前发送事务消息。若在创建订单信息时发生了异常,而此时事务消息已经成功发送,库存系统消费事务消息就会导致订单并没有创建成功,而库存却被扣减。

进而有了第二种情况,如图所示,在本地事务提交完成后再发送事务消息。若在发送事务消息的过程发生了异常,如网络波动等等,将会出现订单已创建完成,而库存系统永远也监听不到消息,导致库存无法正常扣减。

综合第一和第二种情况,汇总成第三种方案如图所示。在本地事务执行前,先向MQ发送前置的Prepared消息,在本地事务执行完毕后,再发送确认的消息,告知MQ当前事务消息需提交/回滚。如果事务正常提交成功,那么这条消息将会被消息消费方监听到;如果事务回滚,MQ会丢弃这条消息,消息消费方无法监听到这条消息。以上情况对应 事务消息生产者的设计思路 图中的 1、2、3、4步骤。

继续分析,如果上图的第二步中,发送确认消息的过程中,出现异常,没有发送成功怎么办?RocketMQ会定期(默认60s)扫描Prepared消息,如果迟迟没有收到确认消息,将会执行事务回查的逻辑,主动去消息生产方确认事务状态。对应 事务消息生产者的设计思路 图中的 5、6、7步骤。综上,是事务消息中生产者的设计思路,保证本地事务和事务消息一致性。

如上图中,在事务消息者中,如果步骤4返回了消费失败或者超时未响应的情况,怎么办?RocketMQ对待事务消息的处理和普通消息一样。如果消费失败或超时,将会把这条消息加入到重试队列中,不断是重复执行步骤3、4,如果重复的次数达到阈值,那么可能需要人工介入处理。
如果消费方本地事务执行成功,仅仅是在确认消息时失败呢? 那么这里又会出现另一个问题 重复消费? 这里就需要具体的业务模块去处理消息的幂等性。如接住Redis来处理。如在本地事务执行前先去查询redis中当前消息是否已经消费,执行成功后再向redis写入一条成功消费的记录,这样就能保证消费不会被重复消费了。
Q&A
Q:从一致性方面考虑,直接采用RPC也可以完成,RPC也支持重试,为什么还要采用MQ?
A:首先应该分清MQ和RPC的应用场景,在现在微服务的架构下,所有人都强调低耦合高内聚,做业务上的解耦,直接采用RPC的方式就会出现强依赖,与微服务的理念背道而驰。
Q:为什么事务消息消费失败后,需要人工介入处理?
A:首先对于一个复杂的系统来讲,将实现整个业务逻辑回滚的代价是巨大的,不但系统复杂度将大大提升,而且还会引入新的问题,如在回滚的过程中又出现了其他事务异常,又该如何处理?其次在一个健壮的系统中出现事务回滚的情况本来就是概率极低的情况,在程序设计时,需要衡量一下为解决这个问题付出的人力物力成本值不值得。
Q:为什么不直接在消息服务层面解决重复消费的问题?
A:消费重复消费解决可以从两个方面考虑。第一 消费方处理消息的业务逻辑保持幂等性,只要保持幂等性,不管重复消费多少次,结果都是一样的;第二保证每条消息都有唯一编号且保证消费处理成功和去重表的日志同时出现,正常情况下出现重复消费的概率并不大,如果消费系统对所有的消费都做处理的话,对系统的吞吐量和高可用会产生影响,所以最好由各自业务系统决定如果处理重复消费。
Q:RocketMQ没能从根本上结果分布式事务问题
A:RocketMQ自身没办法做到像本地事务处理添加@Transactional注解就可以完成事务的提交和回滚。如果有需要,可以尝试使用seata中间件来处理分布式事务。
参考文章:
https://blog.csdn.net/D1842501760/article/details/123142298
https://blog.csdn.net/lishuzhen5678/article/details/122666090
我发现ActiveRecord::Base.transaction在复杂方法中非常有效。我想知道是否可以在如下事务中从AWSS3上传/删除文件:S3Object.transactiondo#writeintofiles#raiseanexceptionend引发异常后,每个操作都应在S3上回滚。S3Object这可能吗?? 最佳答案 虽然S3API具有批量删除功能,但它不支持事务,因为每个删除操作都可以独立于其他操作成功/失败。该API不提供任何批量上传功能(通过PUT或POST),因此每个上传操作都是通过一个独立的API调用完成的
我是rails的新手,想在form字段上应用验证。myviewsnew.html.erb.....模拟.rbclassSimulation{:in=>1..25,:message=>'Therowmustbebetween1and25'}end模拟Controller.rbclassSimulationsController我想检查模型类中row字段的整数范围,如果不在范围内则返回错误信息。我可以检查上面代码的范围,但无法返回错误消息提前致谢 最佳答案 关键是您使用的是模型表单,一种显示ActiveRecord模型实例属性的表单。c
我有一个涉及多台机器、消息队列和事务的问题。因此,例如用户点击网页,点击将消息发送到另一台机器,该机器将付款添加到用户的帐户。每秒可能有数千次点击。事务的所有方面都应该是容错的。我以前从未遇到过这样的事情,但一些阅读表明这是一个众所周知的问题。所以我的问题。我假设安全的方法是使用两阶段提交,但协议(protocol)是阻塞的,所以我不会获得所需的性能,我是否正确?我通常写Ruby,但似乎Redis之类的数据库和Rescue、RabbitMQ等消息队列系统对我的帮助不大——即使我实现某种两阶段提交,如果Redis崩溃,数据也会丢失,因为它本质上只是内存。所有这些让我开始关注erlang和
我的工作要求我为某些测试自动生成电子邮件。我一直在四处寻找,但未能找到可以快速实现的合理解决方案。它需要在outlook而不是其他邮件服务器中,因为我们有一些奇怪的身份验证规则,我们需要保存草稿而不是仅仅发送邮件的选项。显然win32ole可以做到这一点,但我找不到任何相当简单的例子。 最佳答案 假设存储了Outlook凭据并且您设置为自动登录到Outlook,WIN32OLE可以很好地完成此操作:require'win32ole'outlook=WIN32OLE.new('Outlook.Application')message=
我正在使用Ruby,我正在与一个网络端点通信,该端点在发送消息本身之前需要格式化“header”。header中的第一个字段必须是消息长度,它被定义为网络字节顺序中的2二进制字节消息长度。比如我的消息长度是1024。如何将1024表示为二进制双字节? 最佳答案 Ruby(以及Perl和Python等)中字节整理的标准工具是pack和unpack。ruby的packisinArray.您的长度应该是两个字节长,并且按网络字节顺序排列,这听起来像是n格式说明符的工作:n|Integer|16-bitunsigned,network(bi
如果我在模型中设置验证消息validates:name,:presence=>{:message=>'Thenamecantbeblank.'}我如何让该消息显示在闪光警报中,这是我迄今为止尝试过的方法defcreate@message=Message.new(params[:message])if@message.valid?ContactMailer.send_mail(@message).deliverredirect_to(root_path,:notice=>"Thanksforyourmessage,Iwillbeintouchsoon")elseflash[:error]
RSpec似乎按顺序匹配方法接收的消息。我不确定如何使以下代码工作:allow(a).toreceive(:f)expect(a).toreceive(:f).with(2)a.f(1)a.f(2)a.f(3)我问的原因是a.f的一些调用是由我的代码的上层控制的,所以我不能对这些方法调用添加期望。 最佳答案 RSpecspy是测试这种情况的一种方式。要监视一个方法,用allowstub,除了方法名称之外没有任何约束,调用该方法,然后expect确切的方法调用。例如:allow(a).toreceive(:f)a.f(2)a.f(1)
我以为它们存储在cookie中-但不,检查cookie没有任何结果。session也不存储它们。那么,我在哪里可以找到它们?我需要这个来直接设置它们(而不是通过flashhash)。 最佳答案 它们存储在inyoursessionstore.自rails2.0以来的默认设置是cookie存储,但请检查config/initializers/session_store.rb以检查您是否使用默认设置以外的东西。 关于ruby-on-rails-闪存消息存储在哪里?,我们在StackOverf
我已经开始使用mysql2gem。我试图弄清楚一些基本的事情——其中之一是如何明确地执行事务(对于批处理操作,比如多个INSERT/UPDATE查询)。在旧的ruby-mysql中,这是我的方法:client=Mysql.real_connect(...)inserts=["INSERTINTO...","UPDATE..WHEREid=..",#etc]client.autocommit(false)inserts.eachdo|ins|beginclient.query(ins)rescue#handleerrorsorabortentirelyendendclient.commi
我正在尝试在ruby脚本中连接到服务器https://www.xpiron.com/schedule。但是,当我尝试连接时:require'open-uri'doc=open('https://www.xpiron.com/schedule')我收到以下错误消息:OpenSSL::SSL::SSLError:SSL_connectreturned=1errno=0state=SSLv2/v3readserverhelloA:sslv3alertunexpectedmessagefrom/usr/local/lib/ruby/1.9.1/net/http.rb:678:in`conn