草庐IT

实战,SpringBoot + RabbitMQ死信队列实现超时关单

Java佳佳 2023-10-27 原文

需求背景之为什么要有超时关单

原因一:

第三方支付平台的支付连接都是有时效性,创建订单后,需要在一定的时间内支付完成, 比如微信支付、支付宝支付等。当然也可以不关闭订单,做订单二次支付的操作,但业务链路会更加复杂,所以一般会直接关闭

原因二:

电商业务里面还会涉及到商品库存的锁定和释放

所以多数订单业务都是会有这个功能,那如何设计呢?

RabbitMQ死信队列-延迟消息知识点回顾

什么是RabbitMQ的死信队列

没有被及时消费的消息存放的队列

什么是rabbitmq的死信交换机

Dead Letter Exchange(死信交换机,缩写:DLX)当消息成为死信后,会被重新发送到另一个交换机,这个交换机就是DLX死信交换机

消息有哪几种情况成为死信

消费者拒收消息(basic.reject/ basic.nack),并且没有重新入队 requeue=false 消息在队列中未被消费,且超过队列或者消息本身的过期时间TTL(time-to-live) 队列的消息长度达到极限 结果:消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列

什么是延迟队列

一种带有延迟功能的消息队列,Producer 将消息发送到消息队列 服务端,但并不期望这条消息立马投递,而是推迟到在当前时间点之后的某一个时间投递到Consumer 进行消费,该消息即延时消息

业界的一些实现方式:

  • 定时任务高精度轮训

  • redis监听key过期

  • jdk自带的DelayQueue

  • 采用RocketMQ自带延迟消息功能

  • RabbitMQ本身是不支持延迟队列的, 结合死信队列的特性,就可以做到延迟消息

代码实战环节

Rabbitmq 死信队列配置

import java.util.HashMap;
import java.util.Map;
 
/**
 *  * 自定义消息队列配置,
 *  * 发送 关单消息-》延迟exchange-》order.close.delay.queue-》死信exchange-》order.close.queue
 **/
 
@Configuration
@Data
public class RabbitMQConfig {
    /**
     * 交换机
     */
    private String orderEventExchange="order.event.exchange";
    /**
     * 延迟队列,不能被消费者监听
     */
    private String orderCloseDelayQueue = "order.close.delay.queue";
    /**
     * 关单队列,延迟队列的消息过期后转发的队列,用于被消费者监听
     */
    private String orderCloseQueue = "order.close.queue";
    /**
     * 进入到延迟队列的routingKey
     */
    private String orderCloseDelayRoutingKey = "order.close.delay.routing.key";
    /**
     * 进入死信队列的routingKey,消息过期进入死信队列的key
     */
    private String orderCloseRoutingKey = "order.close.routing.key";
    /**
     * 过期时间,毫秒单位,临时改为1分钟过期
     */
    private Integer ttl = 1000 * 60;
    /**
     * 消息转换器
     * @return
     */
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
    /**
     * 创建交换机,topic类型,一般一个业务一个交换机
     * @return
     */
    @Bean
    public Exchange orderEventExchange(){
        return new TopicExchange(orderEventExchange,true,false);
    }
    /**
     * 延迟队列getOrderEventExchange
     * @return
     */
    @Bean
    public Queue orderCloseDelayQueue(){
        Map<String,Object> args = new HashMap<>(3);
        args.put("x-dead-letter-exchange",orderEventExchange);
        args.put("x-dead-letter-routing-key",orderCloseRoutingKey);
        args.put("x-message-ttl",ttl);
        return new Queue(orderCloseDelayQueue,true,false,false,args);
 
    }
    /**
     * 死信队列,是一个普通队列,用于被监听
     * @return
     */
    @Bean
    public Queue orderCloseQueue(){
        return new Queue(orderCloseQueue,true,false,false);
    }
    /**
     * 第一个队列 即延迟队列和交换机建立绑定关系
     * @return
     */
    @Bean
    public Binding orderCloseDelayBinding(){
        return new Binding(orderCloseDelayQueue,
                Binding.DestinationType.QUEUE,orderEventExchange,orderCloseDelayRoutingKey,null);
    }
    /**
     * 死信队列和死信交换机建立绑定关系
     * @return
     */
    @Bean
    public Binding orderCloseBinding(){
 
        return new Binding(orderCloseQueue,
                Binding.DestinationType.QUEUE,orderEventExchange,orderCloseRoutingKey,null);
    }
 
 
}
##----------rabbit配置--------------
spring.rabbitmq.host=120.79.xxx.xxx
spring.rabbitmq.port=5672

#需要手工创建虚拟主机
spring.rabbitmq.virtual-host=dev
spring.rabbitmq.username=admin
spring.rabbitmq.password=password

#消息确认方式,manual(手动ack) 和auto(自动ack); 消息消费重试到达指定次数进到异常交换机和异常队列,需要改为自动ack确认消息
spring.rabbitmq.listener.simple.acknowledge-mode=auto

延迟消息的发送

@Autowired
private RabbitMQConfig rabbitMQConfig;
@Autowired
private RabbitTemplate rabbitTemplate;

//发送延迟消息
EventMessage eventMessage = EventMessage.builder()
    .eventMessageType(EventMessageType.PRODUCT_ORDER_NEW.name())
    .accountNo(loginUser.getAccountNo())
    .bizId(orderOutTradeNo)
    .build();

rabbitTemplate.convertAndSend(rabbitMQConfig.getOrderEventExchange(),rabbitMQConfig.getOrderCloseDelayRoutingKey(),eventMessage);

关单消费者处理

@Component
@Slf4j
@RabbitListener(queuesToDeclare = {@Queue("order.close.queue")})
public class ProductOrderMQListener {
 
    @Autowired
    private ProductOrderService productOrderService;
 
    @RabbitHandler
    public void productOrderHandler(EventMessage eventMessage, Message message, Channel channel){
        log.info("监听到消息ProductOrderMQListener messsage消息内容:{}",message);
        try{
            //关闭订单 业务逻辑
            productOrderService.closeProductOrder(eventMessage);
        }catch (Exception e){
            log.error("消费者失败:{}",eventMessage);
            throw new BizException(BizCodeEnum.MQ_CONSUME_EXCEPTION);
        }
        log.info("消费成功:{}",eventMessage);
    }
}

监听到延迟的关单消息后,根据实际业务进行查询订单、支付等确定是否要进行关单操作。

 

有关实战,SpringBoot + RabbitMQ死信队列实现超时关单的更多相关文章

  1. ruby - 如何根据特征实现 FactoryGirl 的条件行为 - 2

    我有一个用户工厂。我希望默认情况下确认用户。但是鉴于unconfirmed特征,我不希望它们被确认。虽然我有一个基于实现细节而不是抽象的工作实现,但我想知道如何正确地做到这一点。factory:userdoafter(:create)do|user,evaluator|#unwantedimplementationdetailshereunlessFactoryGirl.factories[:user].defined_traits.map(&:name).include?(:unconfirmed)user.confirm!endendtrait:unconfirmeddoenden

  2. ruby - 简单获取法拉第超时 - 2

    有没有办法在这个简单的get方法中添加超时选项?我正在使用法拉第3.3。Faraday.get(url)四处寻找,我只能先发起连接后应用超时选项,然后应用超时选项。或者有什么简单的方法?这就是我现在正在做的:conn=Faraday.newresponse=conn.getdo|req|req.urlurlreq.options.timeout=2#2secondsend 最佳答案 试试这个:conn=Faraday.newdo|conn|conn.options.timeout=20endresponse=conn.get(url

  3. ruby - 分布式事务和队列,ruby,erlang,scala - 2

    我有一个涉及多台机器、消息队列和事务的问题。因此,例如用户点击网页,点击将消息发送到另一台机器,该机器将付款添加到用户的帐户。每秒可能有数千次点击。事务的所有方面都应该是容错的。我以前从未遇到过这样的事情,但一些阅读表明这是一个众所周知的问题。所以我的问题。我假设安全的方法是使用两阶段提交,但协议(protocol)是阻塞的,所以我不会获得所需的性能,我是否正确?我通常写Ruby,但似乎Redis之类的数据库和Rescue、RabbitMQ等消息队列系统对我的帮助不大——即使我实现某种两阶段提交,如果Redis崩溃,数据也会丢失,因为它本质上只是内存。所有这些让我开始关注erlang和

  4. 华为OD机试用Python实现 -【明明的随机数】 2023Q1A - 2

    华为OD机试题本篇题目:明明的随机数题目输入描述输出描述:示例1输入输出说明代码编写思路最近更新的博客华为od2023|什么是华为od,od薪资待遇,od机试题清单华为OD机试真题大全,用Python解华为机试题|机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为o

  5. 基于C#实现简易绘图工具【100010177】 - 2

    C#实现简易绘图工具一.引言实验目的:通过制作窗体应用程序(C#画图软件),熟悉基本的窗体设计过程以及控件设计,事件处理等,熟悉使用C#的winform窗体进行绘图的基本步骤,对于面向对象编程有更加深刻的体会.Tutorial任务设计一个具有基本功能的画图软件**·包括简单的新建文件,保存,重新绘图等功能**·实现一些基本图形的绘制,包括铅笔和基本形状等,学习橡皮工具的创建**·设计一个合理舒适的UI界面**注明:你可能需要先了解一些关于winform窗体应用程序绘图的基本知识,以及关于GDI+类和结构的知识二.实验环境Windows系统下的visualstudio2017C#窗体应用程序三.

  6. MIMO-OFDM无线通信技术及MATLAB实现(1)无线信道:传播和衰落 - 2

     MIMO技术的优缺点优点通过下面三个增益来总体概括:阵列增益。阵列增益是指由于接收机通过对接收信号的相干合并而活得的平均SNR的提高。在发射机不知道信道信息的情况下,MIMO系统可以获得的阵列增益与接收天线数成正比复用增益。在采用空间复用方案的MIMO系统中,可以获得复用增益,即信道容量成倍增加。信道容量的增加与min(Nt,Nr)成正比分集增益。在采用空间分集方案的MIMO系统中,可以获得分集增益,即可靠性性能的改善。分集增益用独立衰落支路数来描述,即分集指数。在使用了空时编码的MIMO系统中,由于接收天线或发射天线之间的间距较远,可认为它们各自的大尺度衰落是相互独立的,因此分布式MIMO

  7. 微信小程序开发入门与实战(Behaviors使用) - 2

    @作者:SYFStrive @博客首页:HomePage📜:微信小程序📌:个人社区(欢迎大佬们加入)👉:社区链接🔗📌:觉得文章不错可以点点关注👉:专栏连接🔗💃:感谢支持,学累了可以先看小段由小胖给大家带来的街舞👉微信小程序(🔥)目录自定义组件-behaviors    1、什么是behaviors    2、behaviors的工作方式    3、创建behavior    4、导入并使用behavior    5、behavior中所有可用的节点    6、同名字段的覆盖和组合规则总结最后自定义组件-behaviors    1、什么是behaviorsbehaviors是小程序中,用于实现

  8. 【Java入门】使用Java实现文件夹的遍历 - 2

    遍历文件夹我们通常是使用递归进行操作,这种方式比较简单,也比较容易理解。本文为大家介绍另一种不使用递归的方式,由于没有使用递归,只用到了循环和集合,所以效率更高一些!一、使用递归遍历文件夹整体思路1、使用File封装初始目录,2、打印这个目录3、获取这个目录下所有的子文件和子目录的数组。4、遍历这个数组,取出每个File对象4-1、如果File是否是一个文件,打印4-2、否则就是一个目录,递归调用代码实现publicclassSearchFile{publicstaticvoidmain(String[]args){//初始目录Filedir=newFile("d:/Dev");Datebeg

  9. ruby-on-rails - Rails 优雅地处理超时 session ? - 2

    使用rails4,ruby2。我在rails配置中为我的cookiesession设置了30分钟的超时时间。问题是,如果我转到表单,让session超时,然后提交表单,我会收到此ActionController::InvalidAuthenticityToken错误。如何在Rails中优雅地处理这个错误?比如说,重定向到登录屏幕? 最佳答案 在您的ApplicationController:rescue_fromActionController::InvalidAuthenticityTokendoredirect_tosome_p

  10. ruby - Arrays Sets 和 SortedSets 在 Ruby 中是如何实现的 - 2

    通常,数组被实现为内存块,集合被实现为HashMap,有序集合被实现为跳跃列表。在Ruby中也是如此吗?我正在尝试从性能和内存占用方面评估Ruby中不同容器的使用情况 最佳答案 数组是Ruby核心库的一部分。每个Ruby实现都有自己的数组实现。Ruby语言规范只规定了Ruby数组的行为,并没有规定任何特定的实现策略。它甚至没有指定任何会强制或至少建议特定实现策略的性能约束。然而,大多数Rubyist对数组的性能特征有一些期望,这会迫使不符合它们的实现变得默默无闻,因为实际上没有人会使用它:插入、前置或追加以及删除元素的最坏情况步骤复

随机推荐