草庐IT

RabbitMQ实现消息的延迟推送或延迟发送

TBoy-rise 2023-06-10 原文

使用RabbitMQ实现消息的延迟消费

文章目录


前言

最近做的项目涉及到后台消息的推送和app端接受消息的功能。具体的要求是:后台向app用户推送的消息,app用户能在app里面的消息栏目查看到消息详情,后台推送的时候能选择定时推送和立即推送两种方案。由于公司的基础架构以及公司文化原因,我们选择了RabbitMQ来实现推送服务和消息盒子之间的通讯。

注意:由于知识有限,本篇文章只对RabbitMQ做基础的介绍,使用场景,以及代码实现,不会对它的高级用法和策略进行介绍。


一、RabbitMQ是什么?

1.RabbitMQ简介

RabbitMQ是有erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列。
常见的消息队列有:RabbitMQ、Kafka 和 ActiveMQ

2.RabbitMQ的优点

RabbitMQ最初起源于金融系统,用于不同模块之间的消息通讯。
优点:

  • 可靠性:可持久化,消息传输和发布确认。
  • 灵活性:通过交换机将消息路由到对应的队列。
  • 集群:多台mq可组成集群,对外提供整体服务
  • 支持多语言:支持多种语言
  • 可界面操作:提供简易的用户操作界面
    等等。

3.常用组件

1.生产者(Producer):消息的制造者
2.消费者(Consumer):消息的消费者
3.消息(Message):消息对象,包括业务参数和mq的参数
4.队列(Queue):缓存或暂存储消息的容器
5.连接(Connection):应用服务和mq进行交互的tcp连接
6.信道(Channel):AMQP命令都是通过信道来完成的,它是一个虚拟的通道,来复用ctp连接。
7.交换机(Exchange):负责从生产者接收消息,根据路由规则发送到指定的队列里面。
8.路由键(Routing Key):交换机根据路由键将消息发往指定的队列。
9.虚拟主机(Virtual Host):就像是一个RabbitMQ 服务。

4.RabbitMQ的结构图

一张图介绍RabbitMQ组成以及各个组件之间的关系(参考网上画的)。

5.交换机的类型

交换机是用来发送消息到指定的队列里面的,它使用哪种路由算法是由交换机类型和绑定的规则所决定的。

A-直连交换机

直连交换机是根据交换机和队列之间绑定的路由键,来将消息发往指定的队列里面。如果交换机与多个队列绑定,则在发送携带路由键的消息时,只发给此路由键的队列,每个队列都是相同副本(比较适合一对一)。

例如:我用直连交换机test-direct-exchange根据路由键test-direct发送一条消息,然后去队列里面看消息,如图所示

分割线------------------------------------------------------------------------------分割线

B-扇形交换机

扇形交换机是将消息发往与它绑定的队列,而不去理会绑定的路由键是否一致。如果交换机与多个队列绑定,每个队列都是相同副本,起到广播的作用。

例如:我用扇形交换机test-fanout-exchange根据路由键test-fanout发送一条消息,然后去队列里面看消息,如图所示

分割线------------------------------------------------------------------------------分割线
但是三个队列都收到了消息,可见扇形交换机会忽略其路由键

C-主题交换机

主题交换机是通过消息的路由键跟交换机和队列之间的绑定路由键进行匹配,将消息发给匹配上的队列,跟直连交换机的一对多相似,但是他的路由键可以支持模糊匹配。

例如:我用主题交换机test-topic-exchange根据路由键test.topic2发送一条消息,然后去队列里面看消息,如图所示

分割线------------------------------------------------------------------------------分割线
根据消息路由键和绑定的路由键进行模糊匹配,推送消息。

D-头交换机

头交换机是主题交换机有点相似,主题交换机是基于路由键,而头交换机是基于消息的headers数据,所以在发送消息给头交换机时指定Routing key是不起作用的。头交换机在绑定队列时需要指定参数Arguments,发送消息时需要指定headers和Arguments相匹配,消息才能被推到相应的队列。

例如:我用头交换机test-headers-exchange根据路由键test-headers1发送一条消息,然后去队列里面看消息,如图所示

分割线------------------------------------------------------------------------------分割线
如果前两个队列能收到消息,证明路由键不生效。

二、定时推送思路实现

定时推送,顾名思义就是要让消息的发送时间给延迟,要想发送延迟消息,就要用到延迟队列。RabbitMQ没有提供延迟队列,其有自带的死信队列和消息存活时间。
对于此需求有两种解决方案:一、使用插件,延迟队列来实现。二、使用消息存活时间和死信队列来实现。我是基于第二种方法来实现的。

1.Time To Live(TTL)

RabbitMQ可以根据队列或消息不同颗粒度来设置消息的生存时间,如果同时设置,则会根据最早的到期时间为准,将消息变为dead letter(死信)。
显而易见,针对消息设置生存时间更加灵活,我也是选用的这种方式。

2.Dead Letter Exchanges(DLX)

RabbitMQ可以在队列里配置死信交换机然后通过路由键绑定到另一个队列上,如果该队列里面的消息dead letter,就会通过交换机发往另一个队列里面,然后就可以消费了。

队列里面出现dead letter的情况有以下几种:

  • 消息或队列TTL过期
  • 队列长度达到最大
  • 消息被消费者拒绝消费

3.具体实现

实现消息延迟发送的具体思路是:首先创建一个交换机来当做死信交换机,再创建一个队列与这个死信交换机进行绑定就称作死信队列。其次创建一个交换机来当做正常交换机,在创建一个队列与这个正常交换机进行绑定,同时将死信交换机和死信路由键配置到这个正常队列里面。这样,当一条带有存活时间的消息通过正常交换机发送过来时,首先进入正常队列里面,然后到了存活时间,就会通过死信交换机根据路由键发送到死信队列里面,然后消费者消费死信队列里的消息,就达到了延迟消费的目的。

java代码实现。
1.首先创建maven项目,导入pom文件

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

2.配置配置文件(如果是yml,就用对应的书写规则)

spring.rabbitmq.host=localhost
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/

3.配置mq的相关组件

package com.wps.cn.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @author wangps
 * @date 2022年11月22日 14:44
 */

@Configuration
public class QueueConfig {
    public static final String NORMAL_QUEUE_NAME = "normal_queue_name";
    public static final String NORMAL_EXCHANGE_NAME = "normal_exchange_name";
    public static final String NORMAL_ROUTING_KEY = "normal_routing_key";
    public static final String DLX_QUEUE_NAME = "dlx_queue_name";
    public static final String DLX_EXCHANGE_NAME = "dlx_exchange_name";
    public static final String DLX_ROUTING_KEY = "dlx_routing_key";

    /**
     * 死信队列
     * @return
     */
    @Bean
    Queue dlxQueue() {
        return new Queue(DLX_QUEUE_NAME, true);
    }

    /**
     * 死信交换机
     * @return
     */
    @Bean
    DirectExchange dlxExchange() {
        return new DirectExchange(DLX_EXCHANGE_NAME);
    }

    /**
     * 绑定死信队列和死信交换机
     * @return
     */
    @Bean
    Binding dlxBinding() {
        return BindingBuilder.bind(dlxQueue()).to(dlxExchange())
                .with(DLX_ROUTING_KEY);
    }

    /**
     * 普通消息队列
     * @return
     */
    @Bean
    Queue normalQueue() {
        Map<String, Object> args = new HashMap<>();
        //设置消息过期时间,此方法是在队列的颗粒度设置,比较局限,所以在消息上设置过期时间
//        args.put("x-message-ttl", 1000*5);
        //设置死信交换机
        args.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);
        //设置死信 routing_key
        args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);
        return new Queue(NORMAL_QUEUE_NAME, true, false, false, args);
    }

    /**
     * 普通交换机
     * @return
     */
    @Bean
    DirectExchange normalExchange() {
        return new DirectExchange(NORMAL_EXCHANGE_NAME);
    }

    /**
     * 绑定普通队列和与之对应的交换机
     * @return
     */
    @Bean
    Binding nomalBinding() {
        return BindingBuilder.bind(normalQueue())
                .to(normalExchange())
                .with(NORMAL_ROUTING_KEY);
    }
}


4.创建消费者

package com.wps.cn.consumer;

import com.rabbitmq.client.Channel;
import com.wps.cn.config.QueueConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * @author wangps
 * @date 2022年11月22日 14:55
 */
@Component
public class DlxConsumer {
    private static final Logger logger = LoggerFactory.getLogger(DlxConsumer.class);

    @RabbitListener(queues = QueueConfig.DLX_QUEUE_NAME)
    public void process(String order, Message message, @Headers Map<String, Object> headers, Channel channel) {
        logger.info("订单号消息",  order);

        System.out.println("执行结束...."+message);

    }
}

5.创建controller当做生产者

package com.wps.cn.controller;

import com.wps.cn.config.QueueConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

/**
 * @author wangps
 * @date 2022年11月22日 15:50
 */
@RestController
@RequestMapping("/producer")
public class TestProducer {

    private static final Logger logger =  LoggerFactory.getLogger(TestProducer.class);

    @Autowired
    private AmqpTemplate rabbitTemplate;

    @GetMapping("/sendMessage")
    public Object submit(){

        String orderId = UUID.randomUUID().toString();
        logger.info("提交订单消息========",orderId);
        rabbitTemplate.convertAndSend(
                QueueConfig.NORMAL_EXCHANGE_NAME,QueueConfig.NORMAL_ROUTING_KEY,
                orderId,
                message -> {
                    message.getMessageProperties().setExpiration(1000*5+"");
                    return message;
                });
        return "{'orderId':'"+orderId+"'}";
    }

}

6.创建启动类

package com.wps.cn;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class DxlRabbitmqTestApplication {

    public static void main(String[] args) {
        SpringApplication.run(DxlRabbitmqTestApplication.class, args);
    }

}

7.运行启动类,然后在页面访问,模拟推送消息
http://localhost:8080/producer/sendMessage
观察日志可以看出,消息发出后,在5s后消费者收到消息,从而达到延迟消费的情况。


总结

以上就是今天所讲的RabbitMQ实现延迟消费,本文主要使用了RabbitMQ的TTL和DLX来实现的。虽然说也是看了网上前辈们的总结,但是也有自己的实操经验,也是为了记录下来能够自己加深印象。
如果有不对的地方还请赐教,希望此篇文章能够对你有所帮助。

有关RabbitMQ实现消息的延迟推送或延迟发送的更多相关文章

  1. ruby - 如何根据特征实现 FactoryGirl 的条件行为 - 2

    我有一个用户工厂。我希望默认情况下确认用户。但是鉴于unconfirmed特征,我不希望它们被确认。虽然我有一个基于实现细节而不是抽象的工作实现,但我想知道如何正确地做到这一点。factory:userdoafter(:create)do|user,evaluator|#unwantedimplementationdetailshereunlessFactoryGirl.factories[:user].defined_traits.map(&:name).include?(:unconfirmed)user.confirm!endendtrait:unconfirmeddoenden

  2. ruby-on-rails - 如何在 Rails View 上显示错误消息? - 2

    我是rails的新手,想在form字段上应用验证。myviewsnew.html.erb.....模拟.rbclassSimulation{:in=>1..25,:message=>'Therowmustbebetween1and25'}end模拟Controller.rbclassSimulationsController我想检查模型类中row字段的整数范围,如果不在范围内则返回错误信息。我可以检查上面代码的范围,但无法返回错误消息提前致谢 最佳答案 关键是您使用的是模型表单,一种显示ActiveRecord模型实例属性的表单。c

  3. jquery - 我的 jquery AJAX POST 请求无需发送 Authenticity Token (Rails) - 2

    rails中是否有任何规定允许站点的所有AJAXPOST请求在没有authenticity_token的情况下通过?我有一个调用Controller方法的JqueryPOSTajax调用,但我没有在其中放置任何真实性代码,但调用成功。我的ApplicationController确实有'request_forgery_protection'并且我已经改变了config.action_controller.consider_all_requests_local在我的environments/development.rb中为false我还搜索了我的代码以确保我没有重载ajaxSend来发送

  4. ruby - 使用 Ruby 通过 Outlook 发送消息的最简单方法是什么? - 2

    我的工作要求我为某些测试自动生成电子邮件。我一直在四处寻找,但未能找到可以快速实现的合理解决方案。它需要在outlook而不是其他邮件服务器中,因为我们有一些奇怪的身份验证规则,我们需要保存草稿而不是仅仅发送邮件的选项。显然win32ole可以做到这一点,但我找不到任何相当简单的例子。 最佳答案 假设存储了Outlook凭据并且您设置为自动登录到Outlook,WIN32OLE可以很好地完成此操作:require'win32ole'outlook=WIN32OLE.new('Outlook.Application')message=

  5. Ruby - 如何将消息长度表示为 2 个二进制字节 - 2

    我正在使用Ruby,我正在与一个网络端点通信,该端点在发送消息本身之前需要格式化“header”。header中的第一个字段必须是消息长度,它被定义为网络字节顺序中的2二进制字节消息长度。比如我的消息长度是1024。如何将1024表示为二进制双字节? 最佳答案 Ruby(以及Perl和Python等)中字节整理的标准工具是pack和unpack。ruby的packisinArray.您的长度应该是两个字节长,并且按网络字节顺序排列,这听起来像是n格式说明符的工作:n|Integer|16-bitunsigned,network(bi

  6. 华为OD机试用Python实现 -【明明的随机数】 2023Q1A - 2

    华为OD机试题本篇题目:明明的随机数题目输入描述输出描述:示例1输入输出说明代码编写思路最近更新的博客华为od2023|什么是华为od,od薪资待遇,od机试题清单华为OD机试真题大全,用Python解华为机试题|机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为o

  7. 基于C#实现简易绘图工具【100010177】 - 2

    C#实现简易绘图工具一.引言实验目的:通过制作窗体应用程序(C#画图软件),熟悉基本的窗体设计过程以及控件设计,事件处理等,熟悉使用C#的winform窗体进行绘图的基本步骤,对于面向对象编程有更加深刻的体会.Tutorial任务设计一个具有基本功能的画图软件**·包括简单的新建文件,保存,重新绘图等功能**·实现一些基本图形的绘制,包括铅笔和基本形状等,学习橡皮工具的创建**·设计一个合理舒适的UI界面**注明:你可能需要先了解一些关于winform窗体应用程序绘图的基本知识,以及关于GDI+类和结构的知识二.实验环境Windows系统下的visualstudio2017C#窗体应用程序三.

  8. MIMO-OFDM无线通信技术及MATLAB实现(1)无线信道:传播和衰落 - 2

     MIMO技术的优缺点优点通过下面三个增益来总体概括:阵列增益。阵列增益是指由于接收机通过对接收信号的相干合并而活得的平均SNR的提高。在发射机不知道信道信息的情况下,MIMO系统可以获得的阵列增益与接收天线数成正比复用增益。在采用空间复用方案的MIMO系统中,可以获得复用增益,即信道容量成倍增加。信道容量的增加与min(Nt,Nr)成正比分集增益。在采用空间分集方案的MIMO系统中,可以获得分集增益,即可靠性性能的改善。分集增益用独立衰落支路数来描述,即分集指数。在使用了空时编码的MIMO系统中,由于接收天线或发射天线之间的间距较远,可认为它们各自的大尺度衰落是相互独立的,因此分布式MIMO

  9. ruby-on-rails - 在 Flash 警报 Rails 3 中显示错误消息 - 2

    如果我在模型中设置验证消息validates:name,:presence=>{:message=>'Thenamecantbeblank.'}我如何让该消息显示在闪光警报中,这是我迄今为止尝试过的方法defcreate@message=Message.new(params[:message])if@message.valid?ContactMailer.send_mail(@message).deliverredirect_to(root_path,:notice=>"Thanksforyourmessage,Iwillbeintouchsoon")elseflash[:error]

  10. 【Java入门】使用Java实现文件夹的遍历 - 2

    遍历文件夹我们通常是使用递归进行操作,这种方式比较简单,也比较容易理解。本文为大家介绍另一种不使用递归的方式,由于没有使用递归,只用到了循环和集合,所以效率更高一些!一、使用递归遍历文件夹整体思路1、使用File封装初始目录,2、打印这个目录3、获取这个目录下所有的子文件和子目录的数组。4、遍历这个数组,取出每个File对象4-1、如果File是否是一个文件,打印4-2、否则就是一个目录,递归调用代码实现publicclassSearchFile{publicstaticvoidmain(String[]args){//初始目录Filedir=newFile("d:/Dev");Datebeg

随机推荐