草庐IT

RabbitMQ延迟队列

小影~ 2023-06-20 原文

目录

💌 介绍

💒 使用场景

🏳‍🌈 模拟案例

📕 准备工作

🏴 写法一(死信队列TTL)

 RabbitMQ配置文件

 生产者

消费者

测试

🏴 写法二 (死信队列TTL)

 RabbitMQ配置文件

生产者

消费者

测试

🚩 写法三 (插件版本-推荐)

插件安装

RabbitMQ配置文件

生产者

消费者

测试

👍 延迟队列方法推荐 


💌 介绍

顾名思义:首先它要具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就是说可以指定队列中的消息在哪个时间点被消费。

💒 使用场景

  • 预支付订单创建成功后,30分钟后还没有支付,自动取消订单,修改订单状态
  • 用户注册成功后,如果3天没有登录则进行短信提醒
  • 优惠券过期前发送短信进行提醒
  • ....

以上场景都可以用延时队列来完成


🏳‍🌈 模拟案例

需求:生产者发布消息,10秒、60秒后消费者拿到消息进行消费

📕 准备工作

导入RabbitMQ依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

 配置RabbitMQ连接相关信息

#MySQL
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672 
    username: xxxx
    password: xxx

server:
  port: 8087

🏴 写法一(死信队列TTL)

生产者生产消息——>到交换机分发给对应的队列(A10秒过期,B60秒过期)——>过期后到死信交换机——>消费者进行消费(执行顺序如下图)

 RabbitMQ配置文件

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

/**
 * @author 小影
 * @create: 2022/8/18 10:26
 * @describe:mq配置 如示例图配置:2交换机、4队列、4路由key
 */
@Configuration
public class RabbitMQConfiguration {
   // 延迟交换机
   public static final String DELAY_EXCHANGE_NAME = "delay.exchange";
   // 延迟队列
   public static final String DELAY_QUEUE_NAME_A = "delay.queue.a";
   public static final String DELAY_QUEUE_NAME_B = "delay.queue.b";
   // 延迟队列路由key
   public static final String DELAY_QUEUE_ROUTING_KEY_A = "delay.routingKey.a";
   public static final String DELAY_QUEUE_ROUTING_KEY_B = "delay.routingKey.b";

   // 死信交换机
   public static final String DEAD_LETTER_EXCHANGE_NAME = "dead.letter.exchange";
   // 死信队列
   public static final String DEAD_LETTER_QUEUE_NAME_A = "dead.letter.queue.a";
   public static final String DEAD_LETTER_QUEUE_NAME_B = "dead.letter.queue.b";
   // 私信队列路由key
   public static final String DEAD_LETTER_ROUTING_KEY_A = "dead.letter.delay_10s.routingkey.a";
   public static final String DEAD_LETTER_ROUTING_KEY_B = "dead.letter.delay_60s.routingkey.b";

   // 声明延迟交换机
   @Bean("delayExchange")
   public DirectExchange delayExchange() {
      return new DirectExchange(DELAY_EXCHANGE_NAME);
   }

   // 声明死信交换机
   @Bean("deadLetterExchange")
   public DirectExchange deadLetterExchange() {
      return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME);
   }

   // 声明延迟队列A,延迟10s,并且绑定到对应的死信交换机
   @Bean("delayQueueA")
   public Queue delayQueueA() {
      HashMap<String, Object> args = new HashMap<>();
      // 声明队列绑定的死信交换机
      args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);
      // 声明队列的属性路由key
      args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_A);
      // 声明队列的消息TTL存活时间
      args.put("x-message-ttl", 10000);
      return QueueBuilder.durable(DELAY_QUEUE_NAME_A).withArguments(args).build();
   }

   // 声明延迟队列B,延迟60s,并且绑定到对应的死信交换机
   @Bean("delayQueueB")
   public Queue delayQueueB() {
      HashMap<String, Object> args = new HashMap<>();
      // 声明队列绑定的死信交换机
      args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);
      // 声明队列的属性路由key
      args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_B);
      // 声明队列的消息TTL存活时间
      args.put("x-message-ttl", 60000);
      return QueueBuilder.durable(DELAY_QUEUE_NAME_B).withArguments(args).build();
   }

   // 声明死信队列A,用于接收延迟10S的消息
   @Bean("deadLetterQueueA")
   public Queue deadLetterQueueA() {
      return new Queue(DEAD_LETTER_QUEUE_NAME_A);
   }

   // 声明死信队列B,用于接收延迟60S的消息
   @Bean("deadLetterQueueB")
   public Queue deadLetterQueueB() {
      return new Queue(DEAD_LETTER_QUEUE_NAME_B);
   }

   // 设置延迟队列A的绑定关系
   @Bean
   public Binding delayBindingA(@Qualifier("delayQueueA") Queue queue,
                                @Qualifier("delayExchange") DirectExchange exchange) {
      return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY_A);
   }

   // 设置延迟队列B的绑定关系
   @Bean
   public Binding delayBindingB(@Qualifier("delayQueueB") Queue queue,
                                @Qualifier("delayExchange") DirectExchange exchange) {
      return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY_B);
   }

   // 设置死信队列A的绑定关系
   @Bean
   public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
                                @Qualifier("deadLetterExchange") DirectExchange exchange) {
      return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_ROUTING_KEY_A);
   }
   // 设置死信队列B的绑定关系
   @Bean
   public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue,
                                     @Qualifier("deadLetterExchange") DirectExchange exchange) {
      return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_ROUTING_KEY_B);
   }
}

此配置文件的代码关系图如下

 生产者

import static com.ying.demo.config.RabbitMQConfiguration.DELAY_EXCHANGE_NAME;
import static com.ying.demo.config.RabbitMQConfiguration.DELAY_QUEUE_ROUTING_KEY_A;
import static com.ying.demo.config.RabbitMQConfiguration.DELAY_QUEUE_ROUTING_KEY_B;
/**
 * @author 小影
 * @create: 2022/8/18 11:13
 * @describe:延迟消息生产者
 */
@Component
public class DelayMessageProducer {

   @Resource
   private RabbitTemplate rabbitTemplate;

   public void send(String message,int type) {
      switch (type){
         case 1: // 10s的消息
            // param:队列名称、路由key、消息
            rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUE_ROUTING_KEY_A, message);
            break;
         case 2:// 60s的消息
            rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUE_ROUTING_KEY_B, message);
            break;
      }
   }
}

消费者

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import static com.ying.demo.config.RabbitMQConfiguration.DEAD_LETTER_QUEUE_NAME_A;
import static com.ying.demo.config.RabbitMQConfiguration.DEAD_LETTER_QUEUE_NAME_B;

/**
 * @author 小影
 * @create: 2022/8/18 11:19
 * @describe:死信消费者
 */
@Slf4j
@Component
public class DeadLetterQueueConsumer {

   /**
    * 监听私信队列A
    * @param message
    * @param channel 作手动回执、确认
    */
   @RabbitListener(queues = DEAD_LETTER_QUEUE_NAME_A)
   public void receiveA(Message message, Channel channel) {
      String msg = new String(message.getBody());
      log.info("当前时间:{},死信队列A收到消息:{}", LocalDateTime.now(),msg);
   }

   /**
    * 监听私信队列B
    * @param message
    * @param channel 作手动回执、确认
    */
   @RabbitListener(queues = DEAD_LETTER_QUEUE_NAME_B)
   public void receiveB(Message message, Channel channel) {
      String msg = new String(message.getBody());
      log.info("当前时间:{},死信队列B收到消息:{}", LocalDateTime.now(),msg);
   }
}

测试

@Slf4j
@RestController
@RequestMapping("rabbitmq")
public class RabbitMqController {
   @Resource
   private DelayMessageProducer producer;

   @GetMapping("send")
   public void send(String message, Integer type) {
      log.info("当前时间:{},消息:{},延迟类型:{}", LocalDateTime.now(), message, Objects.requireNonNull(type));
      producer.send(message, type);
   }
}

分别请求

http://localhost:8089/rabbitmq/send?message=我是10秒&type=1

http://localhost:8089/rabbitmq/send?message=我是60秒&type=2

如果出现异常:Channel shutdown: channel error; protocol method:#method(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'delay.exchange' in vhost '/': received ''x-delayed-message'' but current is 'direct', class-id=40, method-id=10

可能是mq已经存在交换机了先去删掉

弊端:后期要扩展其他不同延时的时间,就需要增加延时的配置,非常麻烦


🏴 写法二 (死信队列TTL)

生产者生产消息(并设置过期时间)——>到交换机分发给延迟队列——>过期后到死信交换机——>消费者进行消费(执行顺序如下图)

 RabbitMQ配置文件

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

/**
 * @author 小影
 * @create: 2022/8/18 10:26
 * @describe:mq配置 如示例图配置:2交换机、2队列、2路由key
 */
@Configuration
public class RabbitMQConfiguration {
   // 延迟交换机
   public static final String DELAY_EXCHANGE_NAME = "delay.exchange";
   // 延迟队列
   public static final String DELAY_QUEUE_NAME = "delay.queue";
   // 延迟队列路由key
   public static final String DELAY_QUEUE_ROUTING_KEY = "delay.routingKey";

   // 死信交换机
   public static final String DEAD_LETTER_EXCHANGE_NAME = "dead.letter.exchange";
   // 死信队列
   public static final String DEAD_LETTER_QUEUE_NAME = "dead.letter.queue";
   // 私信队列路由key
   public static final String DEAD_LETTER_ROUTING_KEY = "dead.letter.routingkey";

   // 声明延迟交换机
   @Bean("delayExchange")
   public DirectExchange delayExchange() {
      return new DirectExchange(DELAY_EXCHANGE_NAME);
   }

   // 声明死信交换机
   @Bean("deadLetterExchange")
   public DirectExchange deadLetterExchange() {
      return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME);
   }

   // 声明延迟队列,不设置存活时间,并且绑定到对应的死信交换机
   @Bean("delayQueue")
   public Queue delayQueue() {
      HashMap<String, Object> args = new HashMap<>();
      // 声明队列绑定的死信交换机
      args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);
      // 声明队列的属性路由key
      args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);
      return QueueBuilder.durable(DELAY_QUEUE_NAME).withArguments(args).build();
   }


   // 声明死信队列
   @Bean("deadLetterQueue")
   public Queue deadLetterQueue() {
      return new Queue(DEAD_LETTER_QUEUE_NAME);
   }


   // 设置延迟队列的绑定关系
   @Bean
   public Binding delayBinding(@Qualifier("delayQueue") Queue queue,
                               @Qualifier("delayExchange") DirectExchange exchange) {
      return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY);
   }


   // 设置死信队列的绑定关系
   @Bean
   public Binding deadLetterBinding(@Qualifier("deadLetterQueue") Queue queue,
                                    @Qualifier("deadLetterExchange") DirectExchange exchange) {
      return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_ROUTING_KEY);
   }

}

生产者

import static com.ying.demo.config.RabbitMQConfiguration.DELAY_EXCHANGE_NAME;
import static com.ying.demo.config.RabbitMQConfiguration.DELAY_QUEUE_ROUTING_KEY;
/**
 * @author 小影
 * @create: 2022/8/18 11:13
 * @describe:延迟消息生产者
 */
@Component
public class DelayMessageProducer {

   @Resource
   private RabbitTemplate rabbitTemplate;

   /**
    *
    * @param message 消息
    * @param delayTime 存活时间
    */
   public void send(String message,String delayTime) {
      // param:延迟交换机,路由KEY,存活时间
      rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUE_ROUTING_KEY, message, msg -> {
         msg.getMessageProperties().setExpiration(delayTime);
         return msg;
      });
   }
}

消费者

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import static com.ying.demo.config.RabbitMQConfiguration.DEAD_LETTER_QUEUE_NAME;

/**
 * @author 小影
 * @create: 2022/8/18 11:19
 * @describe:死信消费者
 */
@Slf4j
@Component
public class DeadLetterQueueConsumer {

   /**
    * 监听私信队列A
    * @param message
    * @param channel 作手动回执、确认
    */
   @RabbitListener(queues = DEAD_LETTER_QUEUE_NAME)
   public void receiveA(Message message, Channel channel) {
      String msg = new String(message.getBody());
      log.info("当前时间:{},死信队列收到消息:{}", LocalDateTime.now(),msg);
   }

}

测试

@Slf4j
@RestController
@RequestMapping("rabbitmq")
public class RabbitMqController {
   @Resource
   private DelayMessageProducer producer;
   @GetMapping("send")
   public void send(String message, String delayTime) {
      log.info("当前时间:{},消息:{},存活时间:{}", LocalDateTime.now(), message, delayTime);
      producer.send(message, delayTime);

   }
}

分别请求

http://localhost:8089/rabbitmq/send?message=我是60秒&delayTime=60000

http://localhost:8089/rabbitmq/send?message=我是10秒&delayTime=10000

弊端:由于是先进先出的,如果60秒进去了,10秒在进去,10秒结束了,他要等60秒结束,60秒出来10秒才能出来


🚩 写法三 (插件版本-推荐)

安装插件后会生成新的Exchange类型 x-delayed-message ,该类型消息支持延迟投递机制,接收消息后并未立即将消息投递至目标队列,而是存储在mnesia(一个分布式数据库)中,随后检测消息延迟时间,如达到投递时间讲其通过 x-delayed-type 类型标记的交换机投至目标队列。 

插件安装

1.进入mq官网社区插件:Community Plugins — RabbitMQ

2.找到rabbitmq_delayed_message_exchange

 选择对应版本的ez文件下载

 Releases · rabbitmq/rabbitmq-delayed-message-exchange · GitHub

 

 注:我的MQ是通过yum安装的

 1.在系统中查看安装的rabbitmq

rpm -qa |grep rabbitmq

 2.查询mq的安装的相关文件目录

rpm -ql rabbitmq-server-3.10.7-1.el8.noarch

 翻到最下面发现mnesia的安装目录; mnesia=分布式数据库,看看就好

 然后把我们下载的ez安装包解压放到 /usr/lib/rabbitmq/lib/rabbitmq_server-3.10.7/plugins 里面

3.重启RabbitMQ服务

systemctl restart rabbitmq-server.service

4.重启插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

 


RabbitMQ配置文件

/**
 * @author 小影
 * @create: 2022/8/18 10:26
 * @describe:mq配置 如示例图配置:1交换机、1队列、1路由key
 */
@Configuration
public class RabbitMQConfiguration {
   // 延迟交换机
   public static final String DELAY_EXCHANGE_NAME = "delay.exchange";
   // 延迟队列
   public static final String DELAY_QUEUE_NAME = "delay.queue";
   // 延迟队列路由key
   public static final String DELAY_QUEUE_ROUTING_KEY = "delay.routingKey";

   // 声明延迟交换机
   @Bean("delayExchange")
   public CustomExchange delayExchange() {
      HashMap<String, Object> args = new HashMap<>();
      args.put("x-delayed-type", "direct");
      return new CustomExchange(DELAY_EXCHANGE_NAME,"x-delayed-message",true,false,args);
   }


   // 声明延迟队列
   @Bean("delayQueue")
   public Queue delayQueue() {
      return new Queue(DELAY_QUEUE_NAME);
   }


   // 设置延迟队列的绑定关系
   @Bean
   public Binding delayBinding(@Qualifier("delayQueue") Queue queue,
                               @Qualifier("delayExchange") CustomExchange exchange) {
      return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY).noargs();
   }
}

生产者

import static com.ying.demo.config.RabbitMQConfiguration.DELAY_EXCHANGE_NAME;
import static com.ying.demo.config.RabbitMQConfiguration.DELAY_QUEUE_ROUTING_KEY;
/**
 * @author 小影
 * @create: 2022/8/18 11:13
 * @describe:延迟消息生产者
 */
@Component
public class DelayMessageProducer {

   @Resource
   private RabbitTemplate rabbitTemplate;

   /**
    *
    * @param message 消息
    * @param delayTime 存活时间
    */
   public void send(String message,Integer delayTime) {
      // param:延迟交换机,路由KEY,存活时间
      rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUE_ROUTING_KEY, message, msg -> {
         msg.getMessageProperties().setDelay(delayTime);
         return msg;
      });
   }
}

消费者

import static com.ying.demo.config.RabbitMQConfiguration.DELAY_QUEUE_NAME;

/**
 * @author 小影
 * @create: 2022/8/18 11:19
 * @describe:消费者
 */
@Slf4j
@Component
public class DeadLetterQueueConsumer {

   /*
    * 监听私信队列
    * @param message
    * @param channel 作手动回执、确认
    */
   @RabbitListener(queues = DELAY_QUEUE_NAME)
   public void receiveA(Message message, Channel channel) {
      String msg = new String(message.getBody());
      log.info("当前时间:{},延迟队列收到消息:{}", LocalDateTime.now(),msg);
   }

}

测试

@Slf4j
@RestController
@RequestMapping("rabbitmq")
public class RabbitMqController {
   @Resource
   private DelayMessageProducer producer;
   @GetMapping("send")
   public void send(String message, Integer delayTime) {
      log.info("当前时间:{},消息:{},存活时间:{}", LocalDateTime.now(), message, delayTime);
      producer.send(message, delayTime);

   }
}

启动项目查看rabbitmq的可视化界面

如下图此时生成的交换机是x-delayed-message类型的

 分别发送:

http://localhost:8089/rabbitmq/send?message=我是60秒&delayTime=60000

http://localhost:8089/rabbitmq/send?message=我是10秒&delayTime=10000

 结局并不是60秒先被消费,完成了我们的意愿。

原理:

  1. 交换机里面有个数据库,生产者生产信息把这个信息放入数据库中
  2. 交换机里面的插件就会一直监听这个时间
  3. 时间到了把对应数据取出来,放入队列,让消费者进行消费

👍 延迟队列方法推荐 

 这是小编在开发学习使用和总结,  这中间或许也存在着不足,希望可以得到大家的理解和建议。如有侵权联系小编!

有关RabbitMQ延迟队列的更多相关文章

  1. ruby - 分布式事务和队列,ruby,erlang,scala - 2

    我有一个涉及多台机器、消息队列和事务的问题。因此,例如用户点击网页,点击将消息发送到另一台机器,该机器将付款添加到用户的帐户。每秒可能有数千次点击。事务的所有方面都应该是容错的。我以前从未遇到过这样的事情,但一些阅读表明这是一个众所周知的问题。所以我的问题。我假设安全的方法是使用两阶段提交,但协议(protocol)是阻塞的,所以我不会获得所需的性能,我是否正确?我通常写Ruby,但似乎Redis之类的数据库和Rescue、RabbitMQ等消息队列系统对我的帮助不大——即使我实现某种两阶段提交,如果Redis崩溃,数据也会丢失,因为它本质上只是内存。所有这些让我开始关注erlang和

  2. ruby-on-rails - 在所有延迟的作业之前 Hook - 2

    是否可以在所有delayed_job任务之前运行一个方法?基本上,我们试图确保每个运行delayed_job的服务器都有我们代码的最新实例,所以我们想运行一个方法来在每个作业运行之前检查它。(我们已经有了“check”方法并在别处使用它。问题只是关于如何从delayed_job中调用它。) 最佳答案 现在有一种官方方法可以通过插件来做到这一点。这篇博文通过示例清楚地描述了如何执行此操作http://www.salsify.com/blog/delayed-jobs-callbacks-and-hooks-in-rails(本文中描述

  3. ruby-on-rails - Ruby 长时间运行的进程对队列事件使用react - 2

    我有一个将某些事件写入队列的Rails3应用。现在我想在服务器上创建一个服务,每x秒轮询一次队列,并按计划执行其他任务。除了创建ruby​​脚本并通过cron作业运行它之外,还有其他稳定的替代方案吗? 最佳答案 尽管启动基于Rails的持久任务是一种选择,但您可能希望查看更有序的系统,例如delayed_job或Starling管理您的工作量。我建议不要在cron中运行某些东西,因为启动整个Rails堆栈的开销可能很大。每隔几秒运行一次它是不切实际的,因为Rails上的启动时间通常为5-15秒,具体取决于您的硬件。不过,每天这样做几

  4. ruby - Heroku - 如何开始工作人员(延迟工作)? - 2

    我有一些使用delayed_job的小程序。在我的本地主机上一切正常,但是当我将我的应用程序部署到Heroku并单击应该由delayed_job执行的链接时,没有任何反应,“任务”只是保存到表delayed_job中。Inthisarticleonherokublog写入时,执行delayed_job表中的任务,当运行此命令时rakejobs:work。但是我怎样才能运行这个命令呢?命令应该放在哪里?在代码中,还是从终端控制台? 最佳答案 如果您正在运行Cedar堆栈,请从终端控制台运行以下命令:herokurunrakejobs:

  5. ruby - 在不提供其所有属性的情况下获取队列 - 2

    我正在尝试为现有队列编写消费者。RabbbitMQ在一个单独的实例中运行,名为“org-queue”的队列已经创建并绑定(bind)到一个交换器。org-queue是一个持久队列,它还有一些额外的属性。现在我需要从这个队列接收消息。我使用下面的代码来获取队列的实例conn=Bunny.newconn.startch=conn.create_channelq=ch.queue("org-queue")它抛出一个错误,指出不同的耐用属性。默认情况下,Bunny似乎使用durable=false。所以我添加了durabletrue作为参数。现在它说明了其他参数之间的区别。我是否需要指定所有参

  6. ruby - 如何在特定队列中推送作业并使用 sidekiq 限制工作人员数量? - 2

    我知道我们可以做到:sidekiq_optionsqueue:"Foo"但在这种情况下,Worker只分配给一个队列:“Foo”。我需要在特定队列中分配作业(而不是worker)。使用Resque很容易:Resque.enqueue_to(queue_name,my_job)另外,为了并发问题,我需要限制每个队列的Worker数量为1。我该怎么做? 最佳答案 您可能会使用https://github.com/brainopia/sidekiq-limit_fetch然后:Sidekiq::Client.push({'class'=>

  7. Python:每日一题之小张的衣服(优先队列、哈夫曼编码) - 2

    题目描述小张买了 n 件白色的衣服,他觉得所有衣服都是一种颜色太单调,希望对这些衣服进行染色,每次染色时,他会将某种颜色的所有衣服寄去染色厂,第 i 件衣服的邮费为 ai​ 元,染色厂会按照小张的要求将其中一部分衣服染成同一种任意的颜色,之后将衣服寄给小张,请问小张要将 n 件衣服染成不同颜色的最小代价是多少?输入描述第一行为一个整数 n ,表示衣服的数量。第二行包括 n 个整数a1​,a2​...an​ 表示第 i 件衣服的邮费为 ai​ 元。(1≤n≤10^5,1≤ai​≤10^9 )输出描述输出一个整数表示小张所要花费的最小代价。输入输出样例输入551321输出25 思考🤔:题意:意思是

  8. ruby-on-rails - 成功发送延迟作业电子邮件后更新用户表 - 2

    我是一个尝试使用delayed_job的NOOB。我想在使用延迟作业成功发送邮件后更新用户模型。发送邮件:UserMailer.delay.welcome_email(user)如果邮件发送成功,请执行以下操作:User.update_attributes(:emailed=>true)邮件发送成功后如何回调或触发? 最佳答案 您需要创建一个Job对象而不是调用#delay帮助程序。您可以使用successHook来执行回调。classWelcomeEmailJob 关于ruby-on-

  9. ruby-on-rails - 启动同一作业的多个延迟作业进程 - 2

    我在运行多个工作器的设置中使用延迟作业。就我的问题而言,这并不重要,但假设我有10个worker(目前在开发模式下这样做)。我遇到的问题是两个不同的工作人员有时会开始处理同一个工作,调用我的工作对象的perform方法。据我所知,DelayedJob正在使用悲观锁定来防止这种情况发生,但有时它似乎仍然有足够的时间在第一个worker有时间实际锁定它之前锁定它。我只是想看看有没有其他人遇到过这个问题,或者是我的设置有问题。我正在使用Postrgres,这发生在我的开发机器和我托管它的Heroku上。我会尝试在我的工作中解决这个问题,但发生这种情况仍然有点问题。理想情况下,延迟作业永远不会

  10. ruby - 延迟作业每次引发错误时都会创建空气制动器 - 2

    defperformrefund_log={success:refund_retry.success?,amount:refund_amount,action:"refund"}ifrefund_retry.success?refund_log[:reference]=refund_retry.transaction.idrefund_log[:message]=refund_retry.transaction.statuselserefund_log[:message]=refund_retry.messagerefund_log[:params]={}refund_retry.er

随机推荐