<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: wq
password: qifeng
virtual-host: /
#开启ack
listener:
direct:
acknowledge-mode: manual
prefetch: 1 # 限制一次拉取消息的数量
simple:
acknowledge-mode: manual #采取手动应答
#concurrency: 1 # 指定最小的消费者数量
#max-concurrency: 1 #指定最大的消费者数量
retry:
enabled: true # 是否支持重试
package com.wanqi.mq;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Description TODO
* @Version 1.0.0
* @Date 2022/11/19
* @Author wandaren
*/
@Configuration
public class RabbitMQConfig {
public static final String EXCHANGE_NAME = "boot_topic_exchange";
public static final String QUEUE_NAME = "boot_queue";
//交换机
@Bean("bootExchange")
public Exchange bootExchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true)
.build();
}
//
@Bean("bootQueue")
public Queue bootQueue(){
return QueueBuilder.durable(QUEUE_NAME).build();
}
//队列和交换机绑定关系
/*
知道哪个队列
知道哪个交换机
知道routing key
*/
@Bean
public Binding bindingQueueExchange(@Qualifier("bootQueue") Queue queue,
@Qualifier("bootExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
}
}
package com.wanqi;
import com.wanqi.mq.RabbitMQConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class SpringbootMqProducersApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void contextLoads() {
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "boot.test", "boot mq hello~~~");
}
}
package com.wanqi.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @Description TODO
* @Version 1.0.0
* @Date 2022/11/19
* @Author wandaren
*/
@Component
public class RabbitMQListener {
@RabbitListener(queues = {"boot_queue"})
public void listenerQueue(Object msg, Message message, Channel channel) {
final long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println(msg.toString());
System.out.println(new String(message.getBody()));
// int x = 3/0;
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
try {
channel.basicNack(deliveryTag, true, true);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
}
}
消息成为死信的三种情况
1、队列消息长度到达限制;
2、消费者拒接消费消息,并且不重回队列;
3、原队列存在消息过期设置,消息到达超时时间未被消费;
package com.wanqi.mq;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Description TODO
* @Version 1.0.0
* @Date 2022/11/19
* @Author wandaren
*/
@Configuration
public class QDRabbitMQConfig {
//交换机名称
public static final String ITEM_EXCHANGE = "item_exchange";
public static final String DEAD_EXCHANGE = "dead_exchange";
//队列名称
public static final String ITEM_QUEUE = "item_queue";
public static final String DEAD_QUEUE = "dead_queue";
//声明业务交换机
@Bean("itemExchange")
public Exchange itemExchange(){
return ExchangeBuilder.topicExchange(ITEM_EXCHANGE).durable(true).build();
}
//声明死信交换机
@Bean("deadExchange")
public Exchange deadExchange(){
return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).durable(true).build();
}
/**
* 声明普通队列,设置队列消息过期时间,队列长度,绑定的死信队列
*/
@Bean("itemQueue")
public Queue itemQueue(){
return QueueBuilder
.durable(ITEM_QUEUE)
// 队列消息过期时间
.ttl(10000)
// 队列长度
.maxLength(15)
// 声明当前队列绑定的死信交换机
.deadLetterExchange(DEAD_EXCHANGE)
// 声明当前队列死信转发的路由key
.deadLetterRoutingKey("infoDead.haha")
.build();
}
/**
* 死信队列,消费者需要监听的队列
*/
@Bean("deadQueue")
public Queue deadQueue(){
return QueueBuilder
.durable(DEAD_QUEUE)
.build();
}
//绑定队列和交换机(业务)
@Bean
public Binding itemQueueExchange(@Qualifier("itemQueue") Queue queue,
@Qualifier("itemExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("infoRouting.#").noargs();
}
//绑定队列和交换机(死信)
@Bean
public Binding deadQueueExchange(@Qualifier("deadQueue") Queue queue,
@Qualifier("deadExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("infoDead.#").noargs();
}
}
package com.wanqi;
import com.wanqi.mq.RabbitMQConfig;
import com.wanqi.mq.RabbitMQConfig2;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class SpringbootMqProducersApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
// 原队列存在消息过期设置,消息到达超时时间未被消费;
@Test
void contextLoads3() {
MessageProperties messageProperties = new MessageProperties();
// 设置过期时间,单位:毫秒
messageProperties.setExpiration("5000");
byte[] msgBytes = "rabbitmq ttl message ...".getBytes();
Message message = new Message(msgBytes, messageProperties);
//发送消息
rabbitTemplate.convertAndSend(QDRabbitMQConfig.ITEM_EXCHANGE,"infoRouting.hehe",message);
System.out.println("发送消息成功");
}
// 模拟队列消息长度到达限制
@Test
void contextLoads4() {
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend(QDRabbitMQConfig.ITEM_EXCHANGE,"infoRouting.hehe",i + "---message");
}
}
}
package com.wanqi.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @Description TODO
* @Version 1.0.0
* @Date 2022/11/19
* @Author wandaren
*/
@Component
public class RabbitMQListener {
@RabbitListener(queues = {"dead_queue"})
public void listenerQueue2(Object msg, Message message, Channel channel) {
final long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println(msg.toString());
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
try {
channel.basicNack(deliveryTag, true, true);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
}
}
package com.wanqi.mq;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Description TODO
* @Version 1.0.0
* @Date 2022/11/19
* @Author wandaren
*/
@Configuration
public class TtlQueueConfig {
//普通交换机名称
public static final String X_CHANGE = "X_Exchange";
//死信交换机名称
public static final String Y_DEAD_CHANGE = "Y_Exchange";
//普通队列
public static final String QUEUE_A = "QA_QUEUE";
public static final String QUEUE_B = "QB_QUEUE";
//死信队列
public static final String DEAD_QUEUE_D = "QD_QUEUE";
//声明普通交换机
@Bean("xExchange")
public DirectExchange xExchange() {
return ExchangeBuilder.directExchange(X_CHANGE).durable(true)
.build();
}
//声明死信交换机
@Bean("yExchange")
public DirectExchange yExchange() {
return ExchangeBuilder.directExchange(Y_DEAD_CHANGE).durable(true)
.build();
}
/**
* 声明队列,延迟10秒
*/
@Bean("queueA")
public Queue queueA() {
return QueueBuilder.durable(QUEUE_A)
.deadLetterExchange(Y_DEAD_CHANGE) //死信交换机
.deadLetterRoutingKey("YD") //死信RoutingKey
.ttl(10000) //消息过期时间
.build();
}
/**
* 声明队列,延迟40秒
*
* @return
*/
@Bean("queueB")
public Queue queueB() {
return QueueBuilder.durable(QUEUE_B)
.deadLetterExchange(Y_DEAD_CHANGE) //死信交换机
.deadLetterRoutingKey("YD") //死信RoutingKey
.ttl(40000) //消息过期时间
.build();
}
/**
* 死信队列,消费者需要监听的队列
*/
@Bean("queueD")
public Queue queueD() {
return QueueBuilder.durable(DEAD_QUEUE_D).build();
}
//绑定 X_CHANGE绑定queueA
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
//绑定 X_CHANGE绑定queueB
@Bean
public Binding queueBBindingX(@Qualifier("queueB") Queue queueB, @Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueB).to(xExchange).with("XB");
}
//绑定 Y_CHANGE绑定queueD
@Bean
public Binding queueDBindingY(@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange) {
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}
@Test
void contextLoads5() {
String message = "延迟队列消息";
rabbitTemplate.convertAndSend(TtlQueueConfig.X_CHANGE, "XA", "TTL=10s的队列:" + message);
rabbitTemplate.convertAndSend(TtlQueueConfig.X_CHANGE, "XB", "TTL=40s的队列:" + message);
}
package com.wanqi.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Date;
/**
* @Description TODO
* @Version 1.0.0
* @Date 2022/11/19
* @Author wandaren
*/
@Component
public class RabbitMQListener {
@RabbitListener(queues = "QD_QUEUE")
public void receiveMessage(Object msg,Message message, Channel channel){
final long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println(msg.toString());
System.out.print("当前时间: " + new Date());
System.out.println(" 收到死信队列的消息:" + msg);
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
try {
channel.basicNack(deliveryTag, true, true);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
}
}
我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div
我有一个Ruby程序,它使用rubyzip压缩XML文件的目录树。gem。我的问题是文件开始变得很重,我想提高压缩级别,因为压缩时间不是问题。我在rubyzipdocumentation中找不到一种为创建的ZIP文件指定压缩级别的方法。有人知道如何更改此设置吗?是否有另一个允许指定压缩级别的Ruby库? 最佳答案 这是我通过查看rubyzip内部创建的代码。level=Zlib::BEST_COMPRESSIONZip::ZipOutputStream.open(zip_file)do|zip|Dir.glob("**/*")d
类classAprivatedeffooputs:fooendpublicdefbarputs:barendprivatedefzimputs:zimendprotecteddefdibputs:dibendendA的实例a=A.new测试a.foorescueputs:faila.barrescueputs:faila.zimrescueputs:faila.dibrescueputs:faila.gazrescueputs:fail测试输出failbarfailfailfail.发送测试[:foo,:bar,:zim,:dib,:gaz].each{|m|a.send(m)resc
很好奇,就使用rubyonrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提
假设我做了一个模块如下:m=Module.newdoclassCendend三个问题:除了对m的引用之外,还有什么方法可以访问C和m中的其他内容?我可以在创建匿名模块后为其命名吗(就像我输入“module...”一样)?如何在使用完匿名模块后将其删除,使其定义的常量不再存在? 最佳答案 三个答案:是的,使用ObjectSpace.此代码使c引用你的类(class)C不引用m:c=nilObjectSpace.each_object{|obj|c=objif(Class===objandobj.name=~/::C$/)}当然这取决于
我正在尝试使用ruby和Savon来使用网络服务。测试服务为http://www.webservicex.net/WS/WSDetails.aspx?WSID=9&CATID=2require'rubygems'require'savon'client=Savon::Client.new"http://www.webservicex.net/stockquote.asmx?WSDL"client.get_quotedo|soap|soap.body={:symbol=>"AAPL"}end返回SOAP异常。检查soap信封,在我看来soap请求没有正确的命名空间。任何人都可以建议我
关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。
我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t
我想将html转换为纯文本。不过,我不想只删除标签,我想智能地保留尽可能多的格式。为插入换行符标签,检测段落并格式化它们等。输入非常简单,通常是格式良好的html(不是整个文档,只是一堆内容,通常没有anchor或图像)。我可以将几个正则表达式放在一起,让我达到80%,但我认为可能有一些现有的解决方案更智能。 最佳答案 首先,不要尝试为此使用正则表达式。很有可能你会想出一个脆弱/脆弱的解决方案,它会随着HTML的变化而崩溃,或者很难管理和维护。您可以使用Nokogiri快速解析HTML并提取文本:require'nokogiri'h
我想为Heroku构建一个Rails3应用程序。他们使用Postgres作为他们的数据库,所以我通过MacPorts安装了postgres9.0。现在我需要一个postgresgem并且共识是出于性能原因你想要pggem。但是我对我得到的错误感到非常困惑当我尝试在rvm下通过geminstall安装pg时。我已经非常明确地指定了所有postgres目录的位置可以找到但仍然无法完成安装:$envARCHFLAGS='-archx86_64'geminstallpg--\--with-pg-config=/opt/local/var/db/postgresql90/defaultdb/po