





















Messaging that just works — RabbitMQ
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
package com.www.mq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 生产者:发送消息
*
* @author Www
* @version 1.8
* @since 2023/2/15 20:41 星期三
*/
public class ProducerHelloWorld {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置参数
// IP 默认值 localhost
connectionFactory.setHost("192.168.36.100");
// 端口 默认 5672
connectionFactory.setPort(5672);
// 虚拟机 默认值 /
connectionFactory.setVirtualHost("/ljt");
// 用户名 默认值 guest
connectionFactory.setUsername("ljt");
// 密码 默认值 guest
connectionFactory.setPassword("ljt");
// 3、创建连接 Connection : 受检异常——> 抛出
Connection connection = connectionFactory.newConnection();
// 4、创建通道 Channel
Channel channel = connection.createChannel();
// 5、创建队列 Queue
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数:
1. queue:队列名称
2. durable:是否持久化,当mq重启之后,还在
3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
*
4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5. arguments:参数。
*/
// 如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare(
// 队列名称
"hello_world",
// 是否持久化,当mq重启之后,还在
true,
// 是否独占。只能有一个消费者监听这队列 当Connection关闭时,是否删除队列
false,
// 是否自动删除。当没有Consumer时,自动删除掉
false,
// 参数
null
);
/*
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
参数:
1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
2. routingKey:路由名称
3. props:配置信息
4. body:发送消息数据
*/
// 发送消息内容
String body = "HelloWorld~~~~~~~~~~";
// 6、发送消息
channel.basicPublish(
// 交换机名称。简单模式下交换机会使用默认的 ""
"",
// 路由名称
"hello_world",
// 配置信息
null,
// 发送消息数据
body.getBytes()
);
// 释放资源
channel.close();
connection.close();
}
}
package com.www.mq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者:获取消息
*
* @author Www
* @version 1.8
* @since 2023/2/15 20:41 星期三
*/
public class ConsumerHelloWorld {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置参数
// IP 默认值 localhost
connectionFactory.setHost("192.168.36.100");
// 端口 默认 5672
connectionFactory.setPort(5672);
// 虚拟机 默认值 /
connectionFactory.setVirtualHost("/ljt");
// 用户名 默认值 guest
connectionFactory.setUsername("ljt");
// 密码 默认值 guest
connectionFactory.setPassword("ljt");
// 3、创建连接 Connection : 受检异常——> 抛出
Connection connection = connectionFactory.newConnection();
// 4、创建通道 Channel
Channel channel = connection.createChannel();
// 5、创建队列 Queue
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数:
1. queue:队列名称
2. durable:是否持久化,当mq重启之后,还在
3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
*
4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5. arguments:参数。
*/
// 如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare(
// 队列名称
"hello_world",
// 是否持久化,当mq重启之后,还在
true,
// 是否独占。只能有一个消费者监听这队列 当Connection关闭时,是否删除队列
false,
// 是否自动删除。当没有Consumer时,自动删除掉
false,
// 参数
null);
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 6、接收消息
Consumer consumer = new DefaultConsumer(channel) {
/**
* <p>
* 回调方法,当收到消息后,会自动执行该方法
* 1. consumerTag:标识
* 2. envelope:获取一些信息,交换机,路由key...
* 3. properties:配置信息
* 4. body:数据
* </p>
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:" + consumerTag);
System.out.println("Exchange:" + envelope.getExchange());
System.out.println("RoutingKey:" + envelope.getRoutingKey());
System.out.println("properties:" + properties);
System.out.println("body:" + new String(body));
}
};
channel.basicConsume(
// 队列名称
"hello_world",
// 是否自动确认
true,
// 回调对象
consumer
);
// 不需要关闭资源
}
}


package com.www.mq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 生产者:发送消息
*
* @author Www
* @version 1.8
* @since 2023/2/15 20:41 星期三
*/
public class ProducerWorkQueues {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置参数
// IP 默认值 localhost
connectionFactory.setHost("192.168.36.100");
// 端口 默认 5672
connectionFactory.setPort(5672);
// 虚拟机 默认值 /
connectionFactory.setVirtualHost("/ljt");
// 用户名 默认值 guest
connectionFactory.setUsername("ljt");
// 密码 默认值 guest
connectionFactory.setPassword("ljt");
// 3、创建连接 Connection : 受检异常——> 抛出
Connection connection = connectionFactory.newConnection();
// 4、创建通道 Channel
Channel channel = connection.createChannel();
// 5、创建队列 Queue
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数:
1. queue:队列名称
2. durable:是否持久化,当mq重启之后,还在
3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
*
4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5. arguments:参数。
*/
// 如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare(
// 队列名称
"WorkQueue",
// 是否持久化,当mq重启之后,还在
true,
// 是否独占。只能有一个消费者监听这队列 当Connection关闭时,是否删除队列
false,
// 是否自动删除。当没有Consumer时,自动删除掉
false,
// 参数
null
);
/*
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
参数:
1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
2. routingKey:路由名称
3. props:配置信息
4. body:发送消息数据
*/
for (int i = 0; i < 10; i++) {
// 发送消息内容
String body = "WorkQueue~~~~~~~~~~" + i;
// 6、发送消息
channel.basicPublish(
// 交换机名称。简单模式下交换机会使用默认的 ""
"",
// 路由名称
"WorkQueue",
// 配置信息
null,
// 发送消息数据
body.getBytes()
);
}
// 释放资源
channel.close();
connection.close();
}
}
package com.www.mq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者:获取消息
*
* @author Www
* @version 1.8
* @since 2023/2/15 20:41 星期三
*/
public class ConsumerWorkQueues1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置参数
// IP 默认值 localhost
connectionFactory.setHost("192.168.36.100");
// 端口 默认 5672
connectionFactory.setPort(5672);
// 虚拟机 默认值 /
connectionFactory.setVirtualHost("/ljt");
// 用户名 默认值 guest
connectionFactory.setUsername("ljt");
// 密码 默认值 guest
connectionFactory.setPassword("ljt");
// 3、创建连接 Connection : 受检异常——> 抛出
Connection connection = connectionFactory.newConnection();
// 4、创建通道 Channel
Channel channel = connection.createChannel();
// 5、创建队列 Queue
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数:
1. queue:队列名称
2. durable:是否持久化,当mq重启之后,还在
3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
*
4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5. arguments:参数。
*/
// 如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare(
// 队列名称
"WorkQueue",
// 是否持久化,当mq重启之后,还在
true,
// 是否独占。只能有一个消费者监听这队列 当Connection关闭时,是否删除队列
false,
// 是否自动删除。当没有Consumer时,自动删除掉
false,
// 参数
null);
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 6、接收消息
Consumer consumer = new DefaultConsumer(channel) {
/**
* <p>
* 回调方法,当收到消息后,会自动执行该方法
* 1. consumerTag:标识
* 2. envelope:获取一些信息,交换机,路由key...
* 3. properties:配置信息
* 4. body:数据
* </p>
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:" + consumerTag);
System.out.println("Exchange:" + envelope.getExchange());
System.out.println("RoutingKey:" + envelope.getRoutingKey());
System.out.println("properties:" + properties);
System.out.println("body:" + new String(body));
}
};
channel.basicConsume(
// 队列名称
"WorkQueue",
// 是否自动确认
true,
// 回调对象
consumer
);
// 不需要关闭资源
}
}

package com.www.mq;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 生产者:发送消息
*
* @author Www
* @version 1.8
* @since 2023/2/15 20:41 星期三
*/
public class ProducerPubSub {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置参数
// IP 默认值 localhost
connectionFactory.setHost("192.168.36.100");
// 端口 默认 5672
connectionFactory.setPort(5672);
// 虚拟机 默认值 /
connectionFactory.setVirtualHost("/ljt");
// 用户名 默认值 guest
connectionFactory.setUsername("ljt");
// 密码 默认值 guest
connectionFactory.setPassword("ljt");
// 3、创建连接 Connection : 受检异常——> 抛出
Connection connection = connectionFactory.newConnection();
// 4、创建通道 Channel
Channel channel = connection.createChannel();
// 5、创建交换机
/*
exchangeDeclare(
String exchange,BuiltinExchangeType type,
boolean durable, boolean autoDelete,
boolean internal, Map<String, Object> arguments
)
参数:
1. exchange:交换机名称
2. type:交换机类型
DIRECT("direct"),:定向
FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
TOPIC("topic"),通配符的方式
HEADERS("headers");参数匹配
3. durable:是否持久化
4. autoDelete:自动删除
5. internal:内部使用。 一般false
6. arguments:参数
*/
String exchangeName = "test_fanout";
channel.exchangeDeclare(
// 交换机名称
exchangeName,
// type:交换机类型
// DIRECT("direct"),:定向
// FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
// TOPIC("topic"),通配符的方式
// HEADERS("headers");参数匹配
BuiltinExchangeType.FANOUT,
// 是否持久化
true,
// 内部使用
false,
// 参数
null
);
// 6、创建队列
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(
// 队列名
queue1Name,
// 是否持久
true,
// 是否独占
false,
//是否自动删除
false,
// 参数
null
);
channel.queueDeclare(
// 队列名
queue2Name,
// 是否持久
true,
// 是否独占
false,
//是否自动删除
false,
// 参数
null
);
// 7、绑定队列和交换机
/*
queueBind(String queue, String exchange, String routingKey)
参数:
1. queue:队列名称
2. exchange:交换机名称
3. routingKey:路由键,绑定规则
如果交换机的类型为fanout ,routingKey设置为""
*/
channel.queueBind(
// 队列名
queue1Name,
// 交换机名
exchangeName,
// routingKey:路由键,绑定规则
// 如果交换机的类型为fanout ,routingKey设置为""
""
);
channel.queueBind(
// 队列名
queue2Name,
// 交换机名
exchangeName,
// routingKey:路由键,绑定规则
// 如果交换机的类型为fanout ,routingKey设置为""
""
);
/*
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
参数:
1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
2. routingKey:路由名称
3. props:配置信息
4. body:发送消息数据
*/
// 发送消息内容
String body = "日志信息:张三调用了findAll方法...日志级别:info...";
// 8、发送消息
channel.basicPublish(
// 交换机名称。简单模式下交换机会使用默认的 ""
exchangeName,
// 路由名称 :如果交换机的类型为fanout ,routingKey设置为""
"",
// 配置信息
null,
// 发送消息数据
body.getBytes()
);
// 9、释放资源
channel.close();
connection.close();
}
}
package com.www.mq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者:获取消息
*
* @author Www
* @version 1.8
* @since 2023/2/15 20:41 星期三
*/
public class ConsumerPubSub1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置参数
// IP 默认值 localhost
connectionFactory.setHost("192.168.36.100");
// 端口 默认 5672
connectionFactory.setPort(5672);
// 虚拟机 默认值 /
connectionFactory.setVirtualHost("/ljt");
// 用户名 默认值 guest
connectionFactory.setUsername("ljt");
// 密码 默认值 guest
connectionFactory.setPassword("ljt");
// 3、创建连接 Connection : 受检异常——> 抛出
Connection connection = connectionFactory.newConnection();
// 4、创建通道 Channel
Channel channel = connection.createChannel();
// 队列名
String queue1Name = "test_fanout_queue1";
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 5、接收消息
Consumer consumer = new DefaultConsumer(channel) {
/**
* <p>
* 回调方法,当收到消息后,会自动执行该方法
* 1. consumerTag:标识
* 2. envelope:获取一些信息,交换机,路由key...
* 3. properties:配置信息
* 4. body:数据
* </p>
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:" + consumerTag);
System.out.println("Exchange:" + envelope.getExchange());
System.out.println("RoutingKey:" + envelope.getRoutingKey());
System.out.println("properties:" + properties);
System.out.println("body:" + new String(body));
}
};
channel.basicConsume(
// 队列名称
queue1Name,
// 是否自动确认
true,
// 回调对象
consumer
);
// 不需要关闭资源
}
}
package com.www.mq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者:获取消息
*
* @author Www
* @version 1.8
* @since 2023/2/15 20:41 星期三
*/
public class ConsumerPubSub2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置参数
// IP 默认值 localhost
connectionFactory.setHost("192.168.36.100");
// 端口 默认 5672
connectionFactory.setPort(5672);
// 虚拟机 默认值 /
connectionFactory.setVirtualHost("/ljt");
// 用户名 默认值 guest
connectionFactory.setUsername("ljt");
// 密码 默认值 guest
connectionFactory.setPassword("ljt");
// 3、创建连接 Connection : 受检异常——> 抛出
Connection connection = connectionFactory.newConnection();
// 4、创建通道 Channel
Channel channel = connection.createChannel();
// 队列名
String queue2Name = "test_fanout_queue2";
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 5、接收消息
Consumer consumer = new DefaultConsumer(channel) {
/**
* <p>
* 回调方法,当收到消息后,会自动执行该方法
* 1. consumerTag:标识
* 2. envelope:获取一些信息,交换机,路由key...
* 3. properties:配置信息
* 4. body:数据
* </p>
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:" + consumerTag);
System.out.println("Exchange:" + envelope.getExchange());
System.out.println("RoutingKey:" + envelope.getRoutingKey());
System.out.println("properties:" + properties);
System.out.println("body:" + new String(body));
}
};
channel.basicConsume(
// 队列名称
queue2Name,
// 是否自动确认
true,
// 回调对象
consumer
);
// 不需要关闭资源
}
}


package com.www.mq;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* Routing 工作模式
* <p>
* 生产者:发送消息
*
* @author Www
* @version 1.8
* @since 2023/2/15 20:41 星期三
*/
public class ProducerRouting {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置参数
// IP 默认值 localhost
connectionFactory.setHost("192.168.36.100");
// 端口 默认 5672
connectionFactory.setPort(5672);
// 虚拟机 默认值 /
connectionFactory.setVirtualHost("/ljt");
// 用户名 默认值 guest
connectionFactory.setUsername("ljt");
// 密码 默认值 guest
connectionFactory.setPassword("ljt");
// 3、创建连接 Connection : 受检异常——> 抛出
Connection connection = connectionFactory.newConnection();
// 4、创建通道 Channel
Channel channel = connection.createChannel();
// 5、创建交换机
/*
exchangeDeclare(
String exchange,BuiltinExchangeType type,
boolean durable, boolean autoDelete,
boolean internal, Map<String, Object> arguments
)
参数:
1. exchange:交换机名称
2. type:交换机类型
DIRECT("direct"),:定向
FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
TOPIC("topic"),通配符的方式
HEADERS("headers");参数匹配
3. durable:是否持久化
4. autoDelete:自动删除
5. internal:内部使用。 一般false
6. arguments:参数
*/
String exchangeName = "test_direct";
channel.exchangeDeclare(
// 交换机名称
exchangeName,
// type:交换机类型
// DIRECT("direct"),:定向
// FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
// TOPIC("topic"),通配符的方式
// HEADERS("headers");参数匹配
BuiltinExchangeType.DIRECT,
// 是否持久化
true,
// 内部使用
false,
// 参数
null
);
// 6、创建队列
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
channel.queueDeclare(
// 队列名
queue1Name,
// 是否持久
true,
// 是否独占
false,
//是否自动删除
false,
// 参数
null
);
channel.queueDeclare(
// 队列名
queue2Name,
// 是否持久
true,
// 是否独占
false,
//是否自动删除
false,
// 参数
null
);
// 7、绑定队列和交换机
/*
queueBind(String queue, String exchange, String routingKey)
参数:
1. queue:队列名称
2. exchange:交换机名称
3. routingKey:路由键,绑定规则
如果交换机的类型为fanout ,routingKey设置为""
*/
// 队列1
channel.queueBind(
// 队列名
queue1Name,
// 交换机名
exchangeName,
// routingKey:路由键,绑定规则
// 如果交换机的类型为fanout ,routingKey设置为""
"error"
);
// 队列2
channel.queueBind(
// 队列名
queue2Name,
// 交换机名
exchangeName,
// routingKey:路由键,绑定规则
// 如果交换机的类型为fanout ,routingKey设置为""
"info"
);
channel.queueBind(
// 队列名
queue2Name,
// 交换机名
exchangeName,
// routingKey:路由键,绑定规则
// 如果交换机的类型为fanout ,routingKey设置为""
"error"
);
channel.queueBind(
// 队列名
queue2Name,
// 交换机名
exchangeName,
// routingKey:路由键,绑定规则
// 如果交换机的类型为fanout ,routingKey设置为""
"waring"
);
/*
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
参数:
1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
2. routingKey:路由名称
3. props:配置信息
4. body:发送消息数据
*/
// 发送消息内容
String body = "日志信息:张三调用了delete方法...警告。。。日志级别:waring...";
// 8、发送消息
channel.basicPublish(
// 交换机名称。简单模式下交换机会使用默认的 ""
exchangeName,
// 路由名称 :如果交换机的类型为fanout ,routingKey设置为""
"waring",
// 配置信息
null,
// 发送消息数据
body.getBytes()
);
// 9、释放资源
channel.close();
connection.close();
}
}
package com.www.mq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者:获取消息
*
* @author Www
* @version 1.8
* @since 2023/2/15 20:41 星期三
*/
public class ConsumerRouting1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置参数
// IP 默认值 localhost
connectionFactory.setHost("192.168.36.100");
// 端口 默认 5672
connectionFactory.setPort(5672);
// 虚拟机 默认值 /
connectionFactory.setVirtualHost("/ljt");
// 用户名 默认值 guest
connectionFactory.setUsername("ljt");
// 密码 默认值 guest
connectionFactory.setPassword("ljt");
// 3、创建连接 Connection : 受检异常——> 抛出
Connection connection = connectionFactory.newConnection();
// 4、创建通道 Channel
Channel channel = connection.createChannel();
// 队列名
String queue2Name = "test_direct_queue1";
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 5、接收消息
Consumer consumer = new DefaultConsumer(channel) {
/**
* <p>
* 回调方法,当收到消息后,会自动执行该方法
* 1. consumerTag:标识
* 2. envelope:获取一些信息,交换机,路由key...
* 3. properties:配置信息
* 4. body:数据
* </p>
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:" + consumerTag);
System.out.println("Exchange:" + envelope.getExchange());
System.out.println("RoutingKey:" + envelope.getRoutingKey());
System.out.println("properties:" + properties);
System.out.println("body:" + new String(body));
}
};
channel.basicConsume(
// 队列名称
queue2Name,
// 是否自动确认
true,
// 回调对象
consumer
);
// 不需要关闭资源
}
}
package com.www.mq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者:获取消息
*
* @author Www
* @version 1.8
* @since 2023/2/15 20:41 星期三
*/
public class ConsumerRouting2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置参数
// IP 默认值 localhost
connectionFactory.setHost("192.168.36.100");
// 端口 默认 5672
connectionFactory.setPort(5672);
// 虚拟机 默认值 /
connectionFactory.setVirtualHost("/ljt");
// 用户名 默认值 guest
connectionFactory.setUsername("ljt");
// 密码 默认值 guest
connectionFactory.setPassword("ljt");
// 3、创建连接 Connection : 受检异常——> 抛出
Connection connection = connectionFactory.newConnection();
// 4、创建通道 Channel
Channel channel = connection.createChannel();
// 队列名
String queue2Name = "test_direct_queue2";
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 5、接收消息
Consumer consumer = new DefaultConsumer(channel) {
/**
* <p>
* 回调方法,当收到消息后,会自动执行该方法
* 1. consumerTag:标识
* 2. envelope:获取一些信息,交换机,路由key...
* 3. properties:配置信息
* 4. body:数据
* </p>
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:" + consumerTag);
System.out.println("Exchange:" + envelope.getExchange());
System.out.println("RoutingKey:" + envelope.getRoutingKey());
System.out.println("properties:" + properties);
System.out.println("body:" + new String(body));
}
};
channel.basicConsume(
// 队列名称
queue2Name,
// 是否自动确认
true,
// 回调对象
consumer
);
// 不需要关闭资源
}
}

package com.www.mq;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* Topics 通配符工作模式
* <p>
* 生产者:发送消息
*
* @author Www
* @version 1.8
* @since 2023/2/15 20:41 星期三
*/
public class ProducerTopics {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置参数
// IP 默认值 localhost
connectionFactory.setHost("192.168.36.100");
// 端口 默认 5672
connectionFactory.setPort(5672);
// 虚拟机 默认值 /
connectionFactory.setVirtualHost("/ljt");
// 用户名 默认值 guest
connectionFactory.setUsername("ljt");
// 密码 默认值 guest
connectionFactory.setPassword("ljt");
// 3、创建连接 Connection : 受检异常——> 抛出
Connection connection = connectionFactory.newConnection();
// 4、创建通道 Channel
Channel channel = connection.createChannel();
// 5、创建交换机
/*
exchangeDeclare(
String exchange,BuiltinExchangeType type,
boolean durable, boolean autoDelete,
boolean internal, Map<String, Object> arguments
)
参数:
1. exchange:交换机名称
2. type:交换机类型
DIRECT("direct"),:定向
FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
TOPIC("topic"),通配符的方式
HEADERS("headers");参数匹配
3. durable:是否持久化
4. autoDelete:自动删除
5. internal:内部使用。 一般false
6. arguments:参数
*/
// 交换机名称
String exchangeName = "test_topic";
channel.exchangeDeclare(
// 交换机名称
exchangeName,
// type:交换机类型
// DIRECT("direct"),:定向
// FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
// TOPIC("topic"),通配符的方式
// HEADERS("headers");参数匹配
BuiltinExchangeType.TOPIC,
// 是否持久化
true,
// 内部使用
false,
// 参数
null
);
// 6、创建队列
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
channel.queueDeclare(
// 队列名
queue1Name,
// 是否持久
true,
// 是否独占
false,
//是否自动删除
false,
// 参数
null
);
channel.queueDeclare(
// 队列名
queue2Name,
// 是否持久
true,
// 是否独占
false,
//是否自动删除
false,
// 参数
null
);
// 7、绑定队列和交换机
/*
queueBind(String queue, String exchange, String routingKey)
参数:
1. queue:队列名称
2. exchange:交换机名称
3. routingKey:路由键,绑定规则
如果交换机的类型为fanout ,routingKey设置为""
*/
// routing key 系统的名称.日志的级别。
// * :表示一个单词
// # :表示0或多个单词
// 需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库
// 队列1
channel.queueBind(
// 队列名
queue1Name,
// 交换机名
exchangeName,
// routingKey:路由键,绑定规则
// 如果交换机的类型为fanout ,routingKey设置为""
"#.error"
);
channel.queueBind(
// 队列名
queue1Name,
// 交换机名
exchangeName,
// routingKey:路由键,绑定规则
// 如果交换机的类型为fanout ,routingKey设置为""
"order.*"
);
// 队列2
channel.queueBind(
// 队列名
queue2Name,
// 交换机名
exchangeName,
// routingKey:路由键,绑定规则
// 如果交换机的类型为fanout ,routingKey设置为""
"*.*"
);
/*
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
参数:
1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
2. routingKey:路由名称
3. props:配置信息
4. body:发送消息数据
*/
// 发送消息内容
String body = "日志信息:张三调用了findAll方法...日志级别:info...";
// 8、发送消息
channel.basicPublish(
// 交换机名称。简单模式下交换机会使用默认的 ""
exchangeName,
// 路由名称 :如果交换机的类型为fanout ,routingKey设置为""
"order.info",
// 配置信息
null,
// 发送消息数据
body.getBytes()
);
// 9、释放资源
channel.close();
connection.close();
}
}
package com.www.mq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者:获取消息
*
* @author Www
* @version 1.8
* @since 2023/2/15 20:41 星期三
*/
public class ConsumerTopic1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置参数
// IP 默认值 localhost
connectionFactory.setHost("192.168.36.100");
// 端口 默认 5672
connectionFactory.setPort(5672);
// 虚拟机 默认值 /
connectionFactory.setVirtualHost("/ljt");
// 用户名 默认值 guest
connectionFactory.setUsername("ljt");
// 密码 默认值 guest
connectionFactory.setPassword("ljt");
// 3、创建连接 Connection : 受检异常——> 抛出
Connection connection = connectionFactory.newConnection();
// 4、创建通道 Channel
Channel channel = connection.createChannel();
// 队列名
String queue2Name = "test_topic_queue1";
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 5、接收消息
Consumer consumer = new DefaultConsumer(channel) {
/**
* <p>
* 回调方法,当收到消息后,会自动执行该方法
* 1. consumerTag:标识
* 2. envelope:获取一些信息,交换机,路由key...
* 3. properties:配置信息
* 4. body:数据
* </p>
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:" + consumerTag);
System.out.println("Exchange:" + envelope.getExchange());
System.out.println("RoutingKey:" + envelope.getRoutingKey());
System.out.println("properties:" + properties);
System.out.println("body:" + new String(body));
}
};
channel.basicConsume(
// 队列名称
queue2Name,
// 是否自动确认
true,
// 回调对象
consumer
);
// 不需要关闭资源
}
}
package com.www.mq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者:获取消息
*
* @author Www
* @version 1.8
* @since 2023/2/15 20:41 星期三
*/
public class ConsumerTopic2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置参数
// IP 默认值 localhost
connectionFactory.setHost("192.168.36.100");
// 端口 默认 5672
connectionFactory.setPort(5672);
// 虚拟机 默认值 /
connectionFactory.setVirtualHost("/ljt");
// 用户名 默认值 guest
connectionFactory.setUsername("ljt");
// 密码 默认值 guest
connectionFactory.setPassword("ljt");
// 3、创建连接 Connection : 受检异常——> 抛出
Connection connection = connectionFactory.newConnection();
// 4、创建通道 Channel
Channel channel = connection.createChannel();
// 队列名
String queue2Name = "test_topic_queue2";
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 5、接收消息
Consumer consumer = new DefaultConsumer(channel) {
/**
* <p>
* 回调方法,当收到消息后,会自动执行该方法
* 1. consumerTag:标识
* 2. envelope:获取一些信息,交换机,路由key...
* 3. properties:配置信息
* 4. body:数据
* </p>
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:" + consumerTag);
System.out.println("Exchange:" + envelope.getExchange());
System.out.println("RoutingKey:" + envelope.getRoutingKey());
System.out.println("properties:" + properties);
System.out.println("body:" + new String(body));
}
};
channel.basicConsume(
// 队列名称
queue2Name,
// 是否自动确认
true,
// 回调对象
consumer
);
// 不需要关闭资源
}
}
绝对详细的RabbitMQ实践操作手册,看完本系列就够了。一、什么是MQ?1、MQ的概念2、理解消息队列二、MQ的优势和劣势1、优势和作用2、劣势三、MQ的应用场景四、AMQP五、工作原理一、什么是MQ?1、MQ的概念MQ全称MessageQueue(消息队列),是在消息的传输过程中保存消息的容器。多用于系统之间的异步通信。下面用图来理解异步通信,并阐明与同步通信的区别。同步通信:甲乙两人面对面交流,你一句我一句必须同步进行,两人除此之外不做任何事情异步通信:异步通信相当于通过第三方转述对话,可能有消息的延迟,但不需要二人时刻保持联系,消息传给第三方后,两人可以做其他自己想做的事情,当需要获取
RabbitMQ+WebStomp很棒。但是,我有一些主题我希望安全为只读或只写。似乎保护这些的唯一机制是使用rabbitmqctl。我可以创建一个虚拟主机,一个用户,然后应用一些权限。然而,这就是Stomp和Rabbit实现开始崩溃的地方。主题采用以下形式:stomp中的/topic/blah,它使用路由键“blah”路由到Rabbit中的“amq.topic”。似乎没有办法为路由key设置权限。似乎:rabbitmqctlset_permissions-pvhostuser".*"".*""^amq\.topic"是我能做的最好的,这仍然是“所有”主题。我也研究过交换,但没有办法在
1.延迟队列延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。很可惜,在RabbitMQ中并未提供延迟队列功能,但是我们有其他的方式可以实现延迟队列,方法就是TTL+死信队列,组合实现延迟队列的效果。2.什么是TTLTTL,全称TimeToLive,消息过期时间设置。消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。队列过期后,会将队列所有消息全部移除。一个队列中某一个消息过期后,只有消息在队列顶端,才会判断
我正在使用Titaniumappcelerator实现移动应用程序。在此应用中,我需要使用RabbitMQ实现聊天。我看到了很多关于如何使用节点js连接到RabbitMQ服务器的示例,但在我的例子中它不适用,我找不到任何JavaScript独立客户端。我想知道是否有一种方法可以在没有客户端的情况下连接到RabbitMQ,或者是否有任何JavaScript独立客户端? 最佳答案 是RabbitMQWebSTOMP吗?你在找什么?使用SockJS,即使在旧浏览器中也能正常工作,不需要Node.js或您编写的任何网络服务器端代码。
一、先检查一下Eralng是否安装好并配置好环境 新建系统变量名为:ERLANG_HOME变量值为erlang安装地址 双击系统变量path,点击“新建”,将%ERLANG_HOME%\bin加入到path中。检查是否安装好:二、1、管理员运行cmd然后打开RabbitMQ安装目录\sbin2、rabbitmq-service.batremove3、setRABBITMQ_BASE=D:\rabbitmq_server\data(D:\rabbitmq_server\data是我自己自定义的目录,这样就绕过c\user\用中文用户名…这个文件夹,这样就不会有中文路径的问题了)4、rabbitm
RabbitMQ在wifi网络中运行良好,我在运行ArchLinux的笔记本电脑中设置了RabbitMQ。但是,当我在不同的wifi网络中使用同一台笔记本电脑时,出现以下错误:machinery:worker.go:42:Goingtoretrylaunchingtheworker.Error:Dial:dialtcp0.0.0.0:5672:getsockopt:connectionrefusedmachinery:retry.go:20:Retryinginxseconds我不知道为什么RabbitMQ在一个网络中工作而不在另一个网络中工作。我尝试更改/etc/rabbitmq/r
这个问题在这里已经有了答案:HowtodetectdeadRabbitMQconnection?(4个答案)关闭9个月前。我正在使用github.com/streadway/amqp对于我的程序。在重新初始化之前,我应该如何确保用于消费和/或生产的channel仍在工作?例如,在ruby中,我可以简单地做:bunny_client=Bunny.new({....})bunny_client.start启动客户端,ifnotbunny_clientorbunny_client.status!=:connected#re-initializetheclient如何使用streadway
我正在使用rabbitmq和golang,我在comsumer端创建了一个等待消息的线程,例如rabbitmq示例。我想主线程将等待kill信号并将信号发送到rabbitmq线程,但如果消息未推送到队列中,rabbitmq线程将永远等待,因此它不会处理kill信号消息。我发现rabbitmq在comsumer中有超时,但我刚刚在java中找到了代码,而不是golang。你能帮助我吗?谢谢。 最佳答案 如果我对您的问题的理解正确,您希望能够退出例行处理队列?超时设置在这里不起作用。这是针对服务器/客户端心跳的,只有在任何时候都没有收到
我的项目采用在GoogleCloud中运行的微服务器架构。我正在考虑从使用RabbitMQ的容器迁移到PubSub引擎。问题是:是否可以一条一条的接收消息?我的代码是用Go和docs编写的说Thecallbackisinvokedconcurrentlybymultiplegoroutines,maximizingthroughput.但是可以调用多少个goroutines呢?如何设置允许的最大值?例如。我的一名工作人员使用第三方API,每个IP仅允许一个连接,因此我只能及时为该工作人员执行一项任务。 最佳答案 正确的解决方案是Ap
我正在尝试使用Go建立一个简单的TLS连接,RabbitMQ在尝试创建启用了TLS的连接(Go客户端)时报告了这个问题:rabbitmq_1|2018-04-1613:37:54.146[error]**Statemachineterminatingrabbitmq_1|**Lastevent={{call,{,#Ref}},{new_user,}}rabbitmq_1|**Whenserverstate={error,"tls_connection:format_status/2crashed"}rabbitmq_1|**Reasonfortermination=error:func