草庐IT

Springboot+RabbitMQ+ACK机制(生产方确认(全局、局部)、消费方确认)、知识盲区

鸢尾の 2023-05-21 原文
RabbitMQ–了解中间件、常用的中间件、分布式系统使用中间件、Docker安装rabbitmq及遇到的问题、RabbitMQ核心组成、消息模式
Springboot整合RabbitMQ(Fanout、Direct、Topic模式)、设置队列信息TTL、死信队列、RabbitMQ磁盘监控,内存控制
Springboot+Rabbitmq消费者注解详解、改序列化方式
Docker简易部署RabbitMQ集群、分布式事务解决方案+案例(可靠生产、可靠消费)
Springboot+RabbitMQ+ACK机制(生产方确认(全局、局部)、消费方确认)、知识盲区

消息确认机制

前言

为了保证消息从队列可靠的达到消费者,RabbitMQ 提供了消息确认机制(Message Acknowledgement)。消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 参数等于 false 时,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移除消息(实际上是先打上删除标记,之后在删除)。当 autoAck 参数等于 true 时,RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。

采用消息确认机制,只要将自动ack设置为false,消费者有足够的时间处理消息数据,消费者可以在处理完后续的业务逻辑后再进行提交ack,确保消息确实是被消费了,防止服务宕机可能导致的消息丢失。而MQ会一直等待消费者手动提交ack!!

在rabbitmq管理页面上可以详细看到队列中的消息情况:

  • ready: 队列中存在的消息,可以提供给消费者的消息数量
  • Unacked: 表示是发送给消费者了,但是消费者还未将ack反馈给MQ的消息数量(一般只有设置了手动ack时,当消费者获取到消息时才会有值)

盲区

自动ACK: 消费者配置中如果是自动ack机制,MQ将消息发送给消费者后直接就将消息给删除了,这个的前提条件是消费者程序没有出现异常,如果消费者接收消息后处理时出现异常,那么MQ将会尝试重发消息给消费者直至达到了消费者服务中配置的最大重试次数后将会直接抛出异常不再重试。

手动ACK:消费者设置了手动ACK机制后,可以显式的提交/拒绝消息(这一步骤叫做发送ACK),如果消息被消费后正常被提交了ack,那么此消息可以说是流程走完了,然后MQ将此消息从队列中删除。而如果消息被消费后被拒绝了,消费者可选择让MQ重发此消息或者让MQ直接移除此消息。后面可以使用死信队列来进行接收这些被消费者拒绝的消息,再进行后续的业务处理。

认知

RabbitMQ 消息确认机制分为两大类:发送方确认、接收方确认。

其中发送方确认又分为:生产者到交换机到确认、交换机到队列的确认。(借用下大佬的图)

实践

发送方确认

  • ConfirmCallback()方法,是一个回调方法,生产者将消息发送给Broker(RabbitMQ服务),然后Broker给回调生产者的ConfirmCallback()方法告知生产者消息是否接收到。也就是确认消息是否正常到达 Exchange 中。

    # 我们需要在生产者中添加配置,表示开启发布者确认(注意新旧版本)
    spring.rabbitmq.publisher-confirm-type=correlated # 新版本
    spring.rabbitmq.publisher-confirms=true # 老版本
    
  • ReturnCallback()方法同样是一个回调方法,是交换机和队列之间的消息确认方式。启动消息失败返回,此方法是在交换器路由不到队列时触发回调,这个可以不使用,因为交换器和队列是在代码里绑定的,如果消息成功投递到 Broker 后几乎不存在绑定队列失败,除非你代码写错了

    # 在生产者中配置,表示发布者返回
    spring.rabbitmq.publisher-returns=true
    

使用

application.yml

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: admin
    password: admin
    # 消息确认(ACK)
    publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange)
    publisher-returns: true #确认消息已发送到队列(Queue)

理解:springboot中需要给RabbitTemplate设置一些方法的回调即可。

通常情况下我们可以直接在配置类中设置好这些东西,但是可能由于某些业务需求,并不是所有的消息都使用常用的方式,也可以将我们的消息发送服务实现接口然后重写这些回调。

配置类方式(全局方式):

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate();
    rabbitTemplate.setConnectionFactory(connectionFactory);
    //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
    rabbitTemplate.setMandatory(true);

    //确认消息送到交换机(Exchange)回调
    rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
        System.out.println("\n确认消息送到交换机(Exchange)结果:");
        System.out.println("相关数据:" + correlationData);
        System.out.println("是否成功:" + ack);
        System.out.println("错误原因:" + cause);
    });

    //确认消息送到队列(Queue)回调
    rabbitTemplate.setReturnsCallback(returnedMessage -> {
        System.out.println("\n确认消息送到队列(Queue)结果:");
        System.out.println("发生消息:" + returnedMessage.getMessage());
        System.out.println("回应码:" + returnedMessage.getReplyCode());
        System.out.println("回应信息:" + returnedMessage.getReplyText());
        System.out.println("交换机:" + returnedMessage.getExchange());
        System.out.println("路由键:" + returnedMessage.getRoutingKey());
    });
    return rabbitTemplate;
}

以接口的形式访问发送一下。注意:确认消息送到队列(Queue)回调,只有在出现错误时才回调。

发送服务类实现(局部方式)

将发送的服务类实现接口,实现回调

@Service
public class SendMessageService implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
    private static Logger logger = LoggerFactory.getLogger(SendMessageService.class);

    @Autowired
    public RabbitTemplate rabbitTemplate;

    public void sendMessage(String str){
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnsCallback(this);
        rabbitTemplate.setConfirmCallback(this);
        // CorrelationData构造函数中的id可以随便写,但是必须要非null而且是唯一的
        rabbitTemplate.convertAndSend("exchange","routingKey", str,new CorrelationData(UUID.randomUUID().toString()));
    }

    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        System.out.println("sender return success" + returnedMessage.toString());
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        logger.info("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");

        if (!b) {
            logger.error("消息发送异常!");
            // 进行处理
        } else {
            logger.info("发送者已经收到确认,correlationData={} ,ack={}, cause={}", correlationData.getId(), b, s);
        }
    }
}

接口方式访问下。没问题

需要注意的是:配置类方式和局部方式只能选择其一,如果一个RabbitTemplate设置了两个或者多个ConfirmCallback/ReturnCallback,会报错的不支持。类似这样的报错:Only one ConfirmCallback/ReturnCallback is supported by each RabbitTemplate。在开发过程中需要注意!!

接收方确认

消费者确认发生在监听队列的消费者处理业务失败,如:发生了异常,不符合要求的数据等,这些场景我们就需要手动处理,比如重新发送或者丢弃。

RabbitMQ 消息确认机制(ACK)默认是自动确认的,自动确认会在消息发送给消费者后立即确认,但存在丢失消息的可能,如果消费端消费逻辑抛出异常,假如你用回滚了也只是保证了数据的一致性,但是消息还是丢了,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。

消息确认模式有:

  1. AcknowledgeMode.NONE:自动确认。
  2. AcknowledgeMode.AUTO:根据情况确认。
  3. AcknowledgeMode.MANUAL:手动确认。

消费者收到消息后,手动调用 Basic.Ack 或 Basic.Nack 或 Basic.Reject 后,RabbitMQ 收到这些消息后,才认为本次投递完成。

  • Basic.Ack 命令:用于确认当前消息。
  • Basic.Nack 命令:用于否定当前消息(批量拒绝) 。
  • Basic.Reject 命令:用于拒绝当前消息(单量拒绝)。

配置,注意是simple模式的ack还是direct模式,或者两个都设置上

server:
  port: 9000

spring:
  rabbitmq:
    username: admin
    password: admin
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual
      direct:
        acknowledge-mode: manual
    # 集群配置,集群配置时使用 rabbitmq.addresses即可,不用配置rabbitmq.port rabbitmq.host了
    addresses: 192.168.0.101:5672,192.168.0.101:5673,192.168.0.101:5673

消费者接收数据

其中在调用basiAck或basicNack时必须要携带一个tag,它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel。而在接收者方法上使用@Header(AmqpHeaders.DELIVERY_TAG)可以直接获取到这个tag。

@Component
public class MQConsumer {
    
    @Autowired
    private DispatcherService dispatcherService;
    
    @RabbitListener(queues = "order.queue")
    public void messageConsumer(String orderMsg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
        try {
            System.out.println("消息:" + orderMsg);
            JSONObject order = JSONObject.parseObject(orderMsg);
            String orderId = order.getString("orderId");
            // 派单处理
            dispatcherService.dispatch(orderId);

            System.out.println(1 / 0); // 出现异常
            // 手动确认
            channel.basicAck(tag, false);
        } catch (Exception e) {
            // 如果出现异常的情况下 根据实际情况重发
            // 重发一次后,丢失
            // 参数1:消息的tag
            // 参数2:多条处理
            // 参数3:重发
                // false 不会重发,会把消息打入到死信队列
                // true 重发,建议不使用try/catch 否则会死循环
            
            // 手动拒绝消息
            channel.basicNack(tag, false, false);
        }
    }
    
}

扩展

消息的接收者也可使用普通类实现ChannelAwareMessageListener接口,重写方法完成,这种是直接全局性接收的。没有最好的,只有最合适的,根据项目情况选择全局接收还是单个类接收自己监听的。

 /**
 * 接收者
 *
 **/
@Component
public class Consumer implements ChannelAwareMessageListener
{
    @Override
    public void onMessage(Message message, Channel channel) throws Exception
    {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try
        {
            if ("queue_name".equals(message.getMessageProperties().getConsumerQueue()))
            {
                System.out.println("消费的消息来自的队列名为:"+message.getMessageProperties().getConsumerQueue());
                System.out.println("接收消息: " + new String(message.getBody(), "UTF-8"));
                System.out.println("执行queue_name中的消息的业务处理流程......");
            }
 
            if ("fanout.A".equals(message.getMessageProperties().getConsumerQueue()))
            {
                System.out.println("消费的消息来自的队列名为:" + message.getMessageProperties().getConsumerQueue());
                System.out.println("接收消息: " + new String(message.getBody(), "UTF-8"));
                System.out.println("执行fanout.A中的消息的业务处理流程......");
            }
 			// 手动提交ack,并且批量确认消息
            channel.basicAck(deliveryTag, true);
        }
        catch (Exception e)
        {
            e.printStackTrace();
 
            /**
             * 拒绝消息,参数说明:
             * long deliveryTag:唯一标识 ID。
             * boolean requeue:如果 requeue 参数设置为 true,
             * 则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者;
             * 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,
             * 而不会把它发送给新的消费者。
             */
            channel.basicReject(deliveryTag, true);
        }
    }
}

参考

参考自

RabbitMQ消息确认机制(ACK)

springboot rabbitmq ACK手动确认

有关Springboot+RabbitMQ+ACK机制(生产方确认(全局、局部)、消费方确认)、知识盲区的更多相关文章

  1. Ruby Sinatra 配置用于生产和开发 - 2

    我已经在Sinatra上创建了应用程序,它代表了一个简单的API。我想在生产和开发上进行部署。我想在部署时选择,是开发还是生产,一些方法的逻辑应该改变,这取决于部署类型。是否有任何想法,如何完成以及解决此问题的一些示例。例子:我有代码get'/api/test'doreturn"Itisdev"end但是在部署到生产环境之后我想在运行/api/test之后看到ItisPROD如何实现? 最佳答案 根据SinatraDocumentation:EnvironmentscanbesetthroughtheRACK_ENVenvironm

  2. ruby-on-rails - 在 Rails 中调试生产服务器 - 2

    您如何在Rails中的实时服务器上进行有效调试,无论是在测试版/生产服务器上?我试过直接在服务器上修改文件,然后重启应用,但是修改好像没有生效,或者需要很长时间(缓存?)我也试过在本地做“脚本/服务器生产”,但是那很慢另一种选择是编码和部署,但效率很低。有人对他们如何有效地做到这一点有任何见解吗? 最佳答案 我会回答你的问题,即使我不同意这种热修补服务器代码的方式:)首先,你真的确定你已经重启了服务器吗?您可以通过跟踪日志文件来检查它。您更改的代码显示的View可能会被缓存。缓存页面位于tmp/cache文件夹下。您可以尝试手动删除

  3. ruby-on-rails - 设计注册确认 - 2

    我在我的项目中有一个用户和一个管理员角色。我使用Devise创建了身份验证。在我的管理员角色中,我没有任何确认。在我的用户模型中,我有以下内容:devise:database_authenticatable,:confirmable,:recoverable,:rememberable,:trackable,:validatable,:timeoutable,:registerable#Setupaccessible(orprotected)attributesforyourmodelattr_accessible:email,:username,:prename,:surname,:

  4. ruby - 在 RSpec 中 stub /模拟全局常量 - 2

    我有一个gem,它有一个根据Rails.env的不同行为的方法:defself.envifdefined?(Rails)Rails.envelsif...现在我想编写一个规范来测试这个代码路径。目前我是这样做的:Kernel.const_set(:Rails,nil)Rails.should_receive(:env).and_return('production')...没关系,只是感觉很丑。另一种方法是在spec_helper中声明:moduleRails;end而且效果也很好。但也许有更好的方法?理想情况下,这应该有效:rails=double('Rails')rails.sho

  5. ruby - 将全局 $stdout 重新分配给控制台 - ruby - 2

    我正在尝试将$stdout设置为临时写入一个文件,然后返回到一个文件。test.rb:old_stdout=$stdout$stdout.reopen("mytestfile.out",'w+')puts"thisgoesinmytestfile"$stdout=old_stdoutputs"thisshouldbeontheconsole"$stdout.reopen("mytestfile1.out",'w+')puts"thisgoesinmytestfile1:"$stdout=old_stdoutputs"thisshouldbebackontheconsole"这是输出。r

  6. ruby - 解释为局部变量会覆盖方法名称吗? - 2

    如thisquestion,当在其自己的赋值中使用未定义的局部变量时,它的计算结果为nil。x=x#=>nil但是当局部变量的名称与现有的方法名称冲突时,就比较棘手了。为什么下面的最后一个示例返回nil?{}.instance_eval{a=keys}#=>[]{}.instance_eval{keys=self.keys}#=>[]{}.instance_eval{keys=keys}#=>nil 最佳答案 在Ruby中,因为可以在没有显式接收器和括号的情况下调用方法,所以在局部变量引用和无接收器无参数方法调用之间存在语法歧义:f

  7. ruby - 在模块/类之间共享全局记录器 - 2

    在许多ruby​​类之间共享记录器实例的最佳(正确)方法是什么?现在我只是将记录器创建为全局$logger=Logger.new变量,但我觉得有更好的方法可以在不使用全局变量的情况下执行此操作。如果我有以下内容:moduleFooclassAclassBclassC...classZend在所有类之间共享记录器实例的最佳方式是什么?我是以某种方式在Foo模块中声明/创建记录器还是只是使用全局$logger没问题? 最佳答案 在模块中添加常量:moduleFooLogger=Logger.newclassAclassBclassC..

  8. ruby - Sinatra 中的全局救援和日志记录异常 - 2

    如何在出现异常时指定全局救援,如果您将Sinatra用于API或应用程序,您将如何处理日志记录? 最佳答案 404可以在not_found方法的帮助下处理,例如:not_founddo'Sitedoesnotexist.'end500s可以通过调用带有block的错误方法来处理,例如:errordo"Applicationerror.Plstrylater."end错误的详细信息可以通过request.env中的sinatra.error访问,如下所示:errordo'Anerroroccured:'+request.env['si

  9. ruby - 为什么允许在 Ruby 类之外定义全局方法? - 2

    我读过这个:Let’sstartwithasimpleRubyprogram.We’llwriteamethodthatreturnsacheery,personalizedgreeting.defsay_goodnight(name)result="Goodnight,"+namereturnresultend我的理解是,方法是定义在类中的函数或子程序,可以关联到类(类方法)或对象(实例方法)。那么,如果它不是在类中定义的,怎么可能是方法呢? 最佳答案 当你在Ruby中以这种方式在全局范围内定义一个函数时,它在技术上变成了Obje

  10. ruby-on-rails - 未定义的局部变量或方法 "articles_path" - 2

    我正在尝试完成本教程:http://guides.rubyonrails.org/getting_started.html#say-hello-rails但是我遇到了一个错误:我有下一个错误:undefinedlocalvariableormethod`articles_path'for#:0x4661d30>Extractedsource(aroundline#1):rake路线:PrefixVerbURIPatternController#Actionwelcome_indexGET/welcome/index(.:format)welcome#indexarticles_newG

随机推荐