目录

Map<String, Object> arguments = new HashMap<>();
// 声明队列的TTL
arguments.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
另一种方式便是针对每条消息设置 TTL
rabbitTemplate.convertAndSend("X", "XC", message, msg -> {
msg.getMessageProperties().setExpiration(ttl);
return msg;
});


<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!--RabbitMQ 依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--swagger-->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
<!--RabbitMQ 测试依赖-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
spring.rabbitmq.host=192.168.23.100
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
@Configuration
@EnableSwagger2
public class SwaggerConfig {
@Bean
public Docket webApiConfig() {
return new Docket(DocumentationType.SWAGGER_2)
.groupName("webapi")
.apiInfo(webApiInfo())
.select()
.build();
}
public ApiInfo webApiInfo() {
return new ApiInfoBuilder()
.title("rabbitmq 接口文档")
.description("本文档描述了 rabbitmq 微服务接口定义")
.version("1.0")
.contact(new Contact("enjoy6288", "http://atguigu.com",
"1551388580@qq.com"))
.build();
}
}
@Configuration
public class TtlQueueConfig {
public static final String X_EXCHANGE = "X";
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
public static final String DEAD_LETTER_QUEUE = "QD";
// 声明xExchange
@Bean("xExchange")
public DirectExchange xExchange() {
return new DirectExchange(X_EXCHANGE);
}
// 声明yExchange
@Bean("yExchange")
public DirectExchange yExchange() {
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
// 声明队列A
@Bean("queueA")
public Queue queueA() {
Map<String, Object> arguments = new HashMap<>();
// 当前队列的死信交换机
arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
// 当前队列的死信路由key
arguments.put("x-dead-letter-routing-key", "YD");
// 声明队列的TTL
arguments.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
}
// 声明队列A绑定交换机X
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange")DirectExchange xExchange) {
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
// 声明队列B
@Bean("queueB")
public Queue queueB() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
// 当前队列的死信路由key
arguments.put("x-dead-letter-routing-key", "YD");
// 声明队列的TTL
arguments.put("x-message-ttl", 40000);
return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
}
// 声明队列B绑定交换机X
@Bean
public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
@Qualifier("xExchange")DirectExchange xExchange) {
return BindingBuilder.bind(queueB).to(xExchange).with("XB");
}
// 声明死信队列
@Bean("queueD")
public Queue queueD() {
return new Queue(DEAD_LETTER_QUEUE);
}
@Bean
// 声明死信队列 QD 绑定关系
public Binding queuedBindingY(@Qualifier("queueD")Queue queueD,
@Qualifier("yExchange")DirectExchange exchange) {
return BindingBuilder.bind(queueD).to(exchange).with("YD");
}
}
@GetMapping("/sendMsg/{message}")
public void sendMsg(@PathVariable String message) {
log.info("当前时间是{},发送一条信息给两个 TTL 队列:{}", new Date().toString(), message);
rabbitTemplate.convertAndSend("X", "XA", "消息来自ttl为10s的队列" + message);
rabbitTemplate.convertAndSend("X", "XB", "消息来自ttl为40s的队列" + message);
}
@Component
@Slf4j
public class DeadLetterQueueConsumer {
@RabbitListener(queues = "QD")
public void receiveD(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);
}
}
发起一个请求 http://localhost:8080/ttl/sendMsg/嘻嘻嘻
在这里新增了一个队列 QC,绑定关系如下,该队列不设置 TTL 时间,而是由生产者设置过期时间 关于插件的安装可以查看这篇文章Docker安装RabbitMq延迟队列插件六、延迟队列优化
(一)代码架构图

(二)配置文件类
@Configuration
public class TtlQueueConfig {
public static final String X_EXCHANGE = "X";
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
public static final String DEAD_LETTER_QUEUE = "QD";
public static final String QUEUE_C = "QC";
// 声明xExchange
@Bean("xExchange")
public DirectExchange xExchange() {
return new DirectExchange(X_EXCHANGE);
}
// 声明yExchange
@Bean("yExchange")
public DirectExchange yExchange() {
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
// 声明队列A
@Bean("queueA")
public Queue queueA() {
Map<String, Object> arguments = new HashMap<>();
// 当前队列的死信交换机
arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
// 当前队列的死信路由key
arguments.put("x-dead-letter-routing-key", "YD");
// 声明队列的TTL
arguments.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
}
// 声明队列A绑定交换机X
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange")DirectExchange xExchange) {
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
// 声明队列B
@Bean("queueB")
public Queue queueB() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
// 当前队列的死信路由key
arguments.put("x-dead-letter-routing-key", "YD");
// 声明队列的TTL
arguments.put("x-message-ttl", 40000);
return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
}
// 声明队列B绑定交换机X
@Bean
public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
@Qualifier("xExchange")DirectExchange xExchange) {
return BindingBuilder.bind(queueB).to(xExchange).with("XB");
}
// 声明队列C
@Bean("queueC")
public Queue queueC() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
// 当前队列的死信路由key
arguments.put("x-dead-letter-routing-key", "YD");
return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();
}
// 声明队列C绑定交换机X
@Bean
public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,
@Qualifier("xExchange")DirectExchange xExchange) {
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
// 声明死信队列
@Bean("queueD")
public Queue queueD() {
return new Queue(DEAD_LETTER_QUEUE);
}
@Bean
// 声明死信队列 QD 绑定关系
public Binding queuedBindingY(@Qualifier("queueD")Queue queueD,
@Qualifier("yExchange")DirectExchange exchange) {
return BindingBuilder.bind(queueD).to(exchange).with("YD");
}
}
(三)消息生产者
@GetMapping("/sendExpirationMsg/{message}/{ttl}")
public void sendMsg(@PathVariable String message, @PathVariable String ttl) {
log.info("当前时间是{},发送一条过期信息给两个 TTL 队列:{}", new Date().toString(), message);
rabbitTemplate.convertAndSend("X", "XC", message, msg -> {
msg.getMessageProperties().setExpiration(ttl);
return msg;
});
}
七、Rabbitmq 插件实现延迟队列

在这里新增了一个队列 delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下:

/*
* 基于插件的延迟队列和延迟交换机
*/
@Configuration
public class DelayedQueueConfig {
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
// 声明队列
@Bean
public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE_NAME);
}
// 声明自定义交换机
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
}
// 声明队列和延迟交换机的绑定
@Bean
public Binding bindingDelayedQueue(@Qualifier("delayedQueue")Queue delayedQueue,
@Qualifier("delayedExchange")CustomExchange exchange) {
return BindingBuilder.bind(delayedQueue).to(exchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
@Component
@Slf4j
public class DelayedQueueConsumer {
@RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
public void receiveDelayedQueue(String message) {
log.info("当前时间:{}, 接收到消息: {}", new Date().toString(), message);
}
}
第二个消息被先消费掉了,符合预期
我有一个涉及多台机器、消息队列和事务的问题。因此,例如用户点击网页,点击将消息发送到另一台机器,该机器将付款添加到用户的帐户。每秒可能有数千次点击。事务的所有方面都应该是容错的。我以前从未遇到过这样的事情,但一些阅读表明这是一个众所周知的问题。所以我的问题。我假设安全的方法是使用两阶段提交,但协议(protocol)是阻塞的,所以我不会获得所需的性能,我是否正确?我通常写Ruby,但似乎Redis之类的数据库和Rescue、RabbitMQ等消息队列系统对我的帮助不大——即使我实现某种两阶段提交,如果Redis崩溃,数据也会丢失,因为它本质上只是内存。所有这些让我开始关注erlang和
是否可以在所有delayed_job任务之前运行一个方法?基本上,我们试图确保每个运行delayed_job的服务器都有我们代码的最新实例,所以我们想运行一个方法来在每个作业运行之前检查它。(我们已经有了“check”方法并在别处使用它。问题只是关于如何从delayed_job中调用它。) 最佳答案 现在有一种官方方法可以通过插件来做到这一点。这篇博文通过示例清楚地描述了如何执行此操作http://www.salsify.com/blog/delayed-jobs-callbacks-and-hooks-in-rails(本文中描述
我有一个将某些事件写入队列的Rails3应用。现在我想在服务器上创建一个服务,每x秒轮询一次队列,并按计划执行其他任务。除了创建ruby脚本并通过cron作业运行它之外,还有其他稳定的替代方案吗? 最佳答案 尽管启动基于Rails的持久任务是一种选择,但您可能希望查看更有序的系统,例如delayed_job或Starling管理您的工作量。我建议不要在cron中运行某些东西,因为启动整个Rails堆栈的开销可能很大。每隔几秒运行一次它是不切实际的,因为Rails上的启动时间通常为5-15秒,具体取决于您的硬件。不过,每天这样做几
我有一些使用delayed_job的小程序。在我的本地主机上一切正常,但是当我将我的应用程序部署到Heroku并单击应该由delayed_job执行的链接时,没有任何反应,“任务”只是保存到表delayed_job中。Inthisarticleonherokublog写入时,执行delayed_job表中的任务,当运行此命令时rakejobs:work。但是我怎样才能运行这个命令呢?命令应该放在哪里?在代码中,还是从终端控制台? 最佳答案 如果您正在运行Cedar堆栈,请从终端控制台运行以下命令:herokurunrakejobs:
我正在尝试为现有队列编写消费者。RabbbitMQ在一个单独的实例中运行,名为“org-queue”的队列已经创建并绑定(bind)到一个交换器。org-queue是一个持久队列,它还有一些额外的属性。现在我需要从这个队列接收消息。我使用下面的代码来获取队列的实例conn=Bunny.newconn.startch=conn.create_channelq=ch.queue("org-queue")它抛出一个错误,指出不同的耐用属性。默认情况下,Bunny似乎使用durable=false。所以我添加了durabletrue作为参数。现在它说明了其他参数之间的区别。我是否需要指定所有参
我知道我们可以做到:sidekiq_optionsqueue:"Foo"但在这种情况下,Worker只分配给一个队列:“Foo”。我需要在特定队列中分配作业(而不是worker)。使用Resque很容易:Resque.enqueue_to(queue_name,my_job)另外,为了并发问题,我需要限制每个队列的Worker数量为1。我该怎么做? 最佳答案 您可能会使用https://github.com/brainopia/sidekiq-limit_fetch然后:Sidekiq::Client.push({'class'=>
题目描述小张买了 n 件白色的衣服,他觉得所有衣服都是一种颜色太单调,希望对这些衣服进行染色,每次染色时,他会将某种颜色的所有衣服寄去染色厂,第 i 件衣服的邮费为 ai 元,染色厂会按照小张的要求将其中一部分衣服染成同一种任意的颜色,之后将衣服寄给小张,请问小张要将 n 件衣服染成不同颜色的最小代价是多少?输入描述第一行为一个整数 n ,表示衣服的数量。第二行包括 n 个整数a1,a2...an 表示第 i 件衣服的邮费为 ai 元。(1≤n≤10^5,1≤ai≤10^9 )输出描述输出一个整数表示小张所要花费的最小代价。输入输出样例输入551321输出25 思考🤔:题意:意思是
我是一个尝试使用delayed_job的NOOB。我想在使用延迟作业成功发送邮件后更新用户模型。发送邮件:UserMailer.delay.welcome_email(user)如果邮件发送成功,请执行以下操作:User.update_attributes(:emailed=>true)邮件发送成功后如何回调或触发? 最佳答案 您需要创建一个Job对象而不是调用#delay帮助程序。您可以使用successHook来执行回调。classWelcomeEmailJob 关于ruby-on-
我在运行多个工作器的设置中使用延迟作业。就我的问题而言,这并不重要,但假设我有10个worker(目前在开发模式下这样做)。我遇到的问题是两个不同的工作人员有时会开始处理同一个工作,调用我的工作对象的perform方法。据我所知,DelayedJob正在使用悲观锁定来防止这种情况发生,但有时它似乎仍然有足够的时间在第一个worker有时间实际锁定它之前锁定它。我只是想看看有没有其他人遇到过这个问题,或者是我的设置有问题。我正在使用Postrgres,这发生在我的开发机器和我托管它的Heroku上。我会尝试在我的工作中解决这个问题,但发生这种情况仍然有点问题。理想情况下,延迟作业永远不会
defperformrefund_log={success:refund_retry.success?,amount:refund_amount,action:"refund"}ifrefund_retry.success?refund_log[:reference]=refund_retry.transaction.idrefund_log[:message]=refund_retry.transaction.statuselserefund_log[:message]=refund_retry.messagerefund_log[:params]={}refund_retry.er