草庐IT

RabbitMQ

wwwljt 2023-04-16 原文

RabbitMQ

Messaging that just works — RabbitMQ

案例

pom.xml

 <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.6.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

生产者

package com.www.mq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 生产者:发送消息
 *
 * @author Www
 * @version 1.8
 * @since 2023/2/15  20:41 星期三
 */
public class ProducerHelloWorld {
    
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2、设置参数
        // IP 默认值 localhost
        connectionFactory.setHost("192.168.36.100");
        // 端口 默认 5672
        connectionFactory.setPort(5672);
        // 虚拟机 默认值 /
        connectionFactory.setVirtualHost("/ljt");
        // 用户名 默认值 guest
        connectionFactory.setUsername("ljt");
        // 密码 默认值 guest
        connectionFactory.setPassword("ljt");
        
        
        // 3、创建连接 Connection : 受检异常——> 抛出
        Connection connection = connectionFactory.newConnection();
        
        // 4、创建通道 Channel
        Channel channel = connection.createChannel();
        
        // 5、创建队列 Queue
        /*
         queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         参数:
             1. queue:队列名称
             2. durable:是否持久化,当mq重启之后,还在
             3. exclusive:
                 * 是否独占。只能有一个消费者监听这队列
                 * 当Connection关闭时,是否删除队列
                 *
             4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
             5. arguments:参数。
         */
        // 如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
        channel.queueDeclare(
                // 队列名称
                "hello_world",
                // 是否持久化,当mq重启之后,还在
                true,
                // 是否独占。只能有一个消费者监听这队列 当Connection关闭时,是否删除队列
                false,
                // 是否自动删除。当没有Consumer时,自动删除掉
                false,
                // 参数
                null
        );
        
         /*
             basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
            参数:
                1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
                2. routingKey:路由名称
                3. props:配置信息
                4. body:发送消息数据
         */
        // 发送消息内容
        String body = "HelloWorld~~~~~~~~~~";
        
        // 6、发送消息
        channel.basicPublish(
                // 交换机名称。简单模式下交换机会使用默认的 ""
                "",
                // 路由名称
                "hello_world",
                // 配置信息
                null,
                // 发送消息数据
                body.getBytes()
        );
        
        // 释放资源
        channel.close();
        connection.close();
        
    }
}


消费者

package com.www.mq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者:获取消息
 *
 * @author Www
 * @version 1.8
 * @since 2023/2/15  20:41 星期三
 */
public class ConsumerHelloWorld {
    
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2、设置参数
        // IP 默认值 localhost
        connectionFactory.setHost("192.168.36.100");
        // 端口 默认 5672
        connectionFactory.setPort(5672);
        // 虚拟机 默认值 /
        connectionFactory.setVirtualHost("/ljt");
        // 用户名 默认值 guest
        connectionFactory.setUsername("ljt");
        // 密码 默认值 guest
        connectionFactory.setPassword("ljt");
        
        
        // 3、创建连接 Connection : 受检异常——> 抛出
        Connection connection = connectionFactory.newConnection();
        
        // 4、创建通道 Channel
        Channel channel = connection.createChannel();
        
        // 5、创建队列 Queue
        /*
         queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         参数:
             1. queue:队列名称
             2. durable:是否持久化,当mq重启之后,还在
             3. exclusive:
                 * 是否独占。只能有一个消费者监听这队列
                 * 当Connection关闭时,是否删除队列
                 *
             4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
             5. arguments:参数。
         */
        // 如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
        channel.queueDeclare(
                // 队列名称
                "hello_world",
                // 是否持久化,当mq重启之后,还在
                true,
                // 是否独占。只能有一个消费者监听这队列 当Connection关闭时,是否删除队列
                false,
                // 是否自动删除。当没有Consumer时,自动删除掉
                false,
                // 参数
                null);
        
         /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                1. queue:队列名称
                2. autoAck:是否自动确认
                3. callback:回调对象
         */
        // 6、接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * <p>
             *   回调方法,当收到消息后,会自动执行该方法
             *      1. consumerTag:标识
             *      2. envelope:获取一些信息,交换机,路由key...
             *      3. properties:配置信息
             *      4. body:数据
             * </p>
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:" + consumerTag);
                System.out.println("Exchange:" + envelope.getExchange());
                System.out.println("RoutingKey:" + envelope.getRoutingKey());
                System.out.println("properties:" + properties);
                System.out.println("body:" + new String(body));
            }
        };
        channel.basicConsume(
                // 队列名称
                "hello_world",
                // 是否自动确认
                true,
                // 回调对象
                consumer
        );
        
        // 不需要关闭资源
    }
}


工作队列

生产者

package com.www.mq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 生产者:发送消息
 *
 * @author Www
 * @version 1.8
 * @since 2023/2/15  20:41 星期三
 */
public class ProducerWorkQueues {
    
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2、设置参数
        // IP 默认值 localhost
        connectionFactory.setHost("192.168.36.100");
        // 端口 默认 5672
        connectionFactory.setPort(5672);
        // 虚拟机 默认值 /
        connectionFactory.setVirtualHost("/ljt");
        // 用户名 默认值 guest
        connectionFactory.setUsername("ljt");
        // 密码 默认值 guest
        connectionFactory.setPassword("ljt");
        
        
        // 3、创建连接 Connection : 受检异常——> 抛出
        Connection connection = connectionFactory.newConnection();
        
        // 4、创建通道 Channel
        Channel channel = connection.createChannel();
        
        // 5、创建队列 Queue
        /*
         queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         参数:
             1. queue:队列名称
             2. durable:是否持久化,当mq重启之后,还在
             3. exclusive:
                 * 是否独占。只能有一个消费者监听这队列
                 * 当Connection关闭时,是否删除队列
                 *
             4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
             5. arguments:参数。
         */
        // 如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
        channel.queueDeclare(
                // 队列名称
                "WorkQueue",
                // 是否持久化,当mq重启之后,还在
                true,
                // 是否独占。只能有一个消费者监听这队列 当Connection关闭时,是否删除队列
                false,
                // 是否自动删除。当没有Consumer时,自动删除掉
                false,
                // 参数
                null
        );
        
         /*
             basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
            参数:
                1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
                2. routingKey:路由名称
                3. props:配置信息
                4. body:发送消息数据
         */
        for (int i = 0; i < 10; i++) {
            // 发送消息内容
            String body = "WorkQueue~~~~~~~~~~" + i;
            
            // 6、发送消息
            channel.basicPublish(
                    // 交换机名称。简单模式下交换机会使用默认的 ""
                    "",
                    // 路由名称
                    "WorkQueue",
                    // 配置信息
                    null,
                    // 发送消息数据
                    body.getBytes()
            );
        }
        
        
        // 释放资源
        channel.close();
        connection.close();
        
    }
}


消费者: 两个

package com.www.mq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者:获取消息
 *
 * @author Www
 * @version 1.8
 * @since 2023/2/15  20:41 星期三
 */
public class ConsumerWorkQueues1 {
    
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2、设置参数
        // IP 默认值 localhost
        connectionFactory.setHost("192.168.36.100");
        // 端口 默认 5672
        connectionFactory.setPort(5672);
        // 虚拟机 默认值 /
        connectionFactory.setVirtualHost("/ljt");
        // 用户名 默认值 guest
        connectionFactory.setUsername("ljt");
        // 密码 默认值 guest
        connectionFactory.setPassword("ljt");
        
        
        // 3、创建连接 Connection : 受检异常——> 抛出
        Connection connection = connectionFactory.newConnection();
        
        // 4、创建通道 Channel
        Channel channel = connection.createChannel();
        
        // 5、创建队列 Queue
        /*
         queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         参数:
             1. queue:队列名称
             2. durable:是否持久化,当mq重启之后,还在
             3. exclusive:
                 * 是否独占。只能有一个消费者监听这队列
                 * 当Connection关闭时,是否删除队列
                 *
             4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
             5. arguments:参数。
         */
        // 如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
        channel.queueDeclare(
                // 队列名称
                "WorkQueue",
                // 是否持久化,当mq重启之后,还在
                true,
                // 是否独占。只能有一个消费者监听这队列 当Connection关闭时,是否删除队列
                false,
                // 是否自动删除。当没有Consumer时,自动删除掉
                false,
                // 参数
                null);
        
         /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                1. queue:队列名称
                2. autoAck:是否自动确认
                3. callback:回调对象
         */
        // 6、接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * <p>
             *   回调方法,当收到消息后,会自动执行该方法
             *      1. consumerTag:标识
             *      2. envelope:获取一些信息,交换机,路由key...
             *      3. properties:配置信息
             *      4. body:数据
             * </p>
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:" + consumerTag);
                System.out.println("Exchange:" + envelope.getExchange());
                System.out.println("RoutingKey:" + envelope.getRoutingKey());
                System.out.println("properties:" + properties);
                System.out.println("body:" + new String(body));
            }
        };
        channel.basicConsume(
                // 队列名称
                "WorkQueue",
                // 是否自动确认
                true,
                // 回调对象
                consumer
        );
        
        // 不需要关闭资源
    }
}


订阅模式

生产者

package com.www.mq;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 生产者:发送消息
 *
 * @author Www
 * @version 1.8
 * @since 2023/2/15  20:41 星期三
 */
public class ProducerPubSub {
    
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2、设置参数
        // IP 默认值 localhost
        connectionFactory.setHost("192.168.36.100");
        // 端口 默认 5672
        connectionFactory.setPort(5672);
        // 虚拟机 默认值 /
        connectionFactory.setVirtualHost("/ljt");
        // 用户名 默认值 guest
        connectionFactory.setUsername("ljt");
        // 密码 默认值 guest
        connectionFactory.setPassword("ljt");
        
        
        // 3、创建连接 Connection : 受检异常——> 抛出
        Connection connection = connectionFactory.newConnection();
        
        // 4、创建通道 Channel
        Channel channel = connection.createChannel();
        
        // 5、创建交换机
        /*
           exchangeDeclare(
            String exchange,BuiltinExchangeType type,
            boolean durable, boolean autoDelete,
            boolean internal, Map<String, Object> arguments
            )
           参数:
            1. exchange:交换机名称
            2. type:交换机类型
                DIRECT("direct"),:定向
                FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
                TOPIC("topic"),通配符的方式
                HEADERS("headers");参数匹配
            3. durable:是否持久化
            4. autoDelete:自动删除
            5. internal:内部使用。 一般false
            6. arguments:参数
        */
        String exchangeName = "test_fanout";
        channel.exchangeDeclare(
                // 交换机名称
                exchangeName,
                // type:交换机类型
                //   DIRECT("direct"),:定向
                //   FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
                //   TOPIC("topic"),通配符的方式
                //   HEADERS("headers");参数匹配
                BuiltinExchangeType.FANOUT,
                // 是否持久化
                true,
                // 内部使用
                false,
                // 参数
                null
        
        );
        
        // 6、创建队列
        String queue1Name = "test_fanout_queue1";
        String queue2Name = "test_fanout_queue2";
        channel.queueDeclare(
                // 队列名
                queue1Name,
                // 是否持久
                true,
                // 是否独占
                false,
                //是否自动删除
                false,
                // 参数
                null
        );
        channel.queueDeclare(
                // 队列名
                queue2Name,
                // 是否持久
                true,
                // 是否独占
                false,
                //是否自动删除
                false,
                // 参数
                null
        );
        
        // 7、绑定队列和交换机
         /*
            queueBind(String queue, String exchange, String routingKey)
            参数:
                1. queue:队列名称
                2. exchange:交换机名称
                3. routingKey:路由键,绑定规则
                    如果交换机的类型为fanout ,routingKey设置为""
         */
        channel.queueBind(
                // 队列名
                queue1Name,
                // 交换机名
                exchangeName,
                //  routingKey:路由键,绑定规则
                //    如果交换机的类型为fanout ,routingKey设置为""
                ""
        );
        channel.queueBind(
                // 队列名
                queue2Name,
                // 交换机名
                exchangeName,
                //  routingKey:路由键,绑定规则
                //    如果交换机的类型为fanout ,routingKey设置为""
                ""
        );
        
         /*
             basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
            参数:
                1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
                2. routingKey:路由名称
                3. props:配置信息
                4. body:发送消息数据
         */
        // 发送消息内容
        String body = "日志信息:张三调用了findAll方法...日志级别:info...";
        // 8、发送消息
        channel.basicPublish(
                // 交换机名称。简单模式下交换机会使用默认的 ""
                exchangeName,
                // 路由名称 :如果交换机的类型为fanout ,routingKey设置为""
                "",
                // 配置信息
                null,
                // 发送消息数据
                body.getBytes()
        );
        
        
        // 9、释放资源
        channel.close();
        connection.close();
        
    }
}


消费者1

package com.www.mq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者:获取消息
 *
 * @author Www
 * @version 1.8
 * @since 2023/2/15  20:41 星期三
 */
public class ConsumerPubSub1 {
    
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2、设置参数
        // IP 默认值 localhost
        connectionFactory.setHost("192.168.36.100");
        // 端口 默认 5672
        connectionFactory.setPort(5672);
        // 虚拟机 默认值 /
        connectionFactory.setVirtualHost("/ljt");
        // 用户名 默认值 guest
        connectionFactory.setUsername("ljt");
        // 密码 默认值 guest
        connectionFactory.setPassword("ljt");
        
        
        // 3、创建连接 Connection : 受检异常——> 抛出
        Connection connection = connectionFactory.newConnection();
        
        // 4、创建通道 Channel
        Channel channel = connection.createChannel();
        
        // 队列名
        String queue1Name = "test_fanout_queue1";
       
        
         /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                1. queue:队列名称
                2. autoAck:是否自动确认
                3. callback:回调对象
         */
        // 5、接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * <p>
             *   回调方法,当收到消息后,会自动执行该方法
             *      1. consumerTag:标识
             *      2. envelope:获取一些信息,交换机,路由key...
             *      3. properties:配置信息
             *      4. body:数据
             * </p>
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:" + consumerTag);
                System.out.println("Exchange:" + envelope.getExchange());
                System.out.println("RoutingKey:" + envelope.getRoutingKey());
                System.out.println("properties:" + properties);
                System.out.println("body:" + new String(body));
            }
        };
        channel.basicConsume(
                // 队列名称
                queue1Name,
                // 是否自动确认
                true,
                // 回调对象
                consumer
        );
        
        // 不需要关闭资源
    }
}


消费者2

package com.www.mq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者:获取消息
 *
 * @author Www
 * @version 1.8
 * @since 2023/2/15  20:41 星期三
 */
public class ConsumerPubSub2 {
    
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2、设置参数
        // IP 默认值 localhost
        connectionFactory.setHost("192.168.36.100");
        // 端口 默认 5672
        connectionFactory.setPort(5672);
        // 虚拟机 默认值 /
        connectionFactory.setVirtualHost("/ljt");
        // 用户名 默认值 guest
        connectionFactory.setUsername("ljt");
        // 密码 默认值 guest
        connectionFactory.setPassword("ljt");
        
        
        // 3、创建连接 Connection : 受检异常——> 抛出
        Connection connection = connectionFactory.newConnection();
        
        // 4、创建通道 Channel
        Channel channel = connection.createChannel();
        
        // 队列名
        String queue2Name = "test_fanout_queue2";
       
        
         /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                1. queue:队列名称
                2. autoAck:是否自动确认
                3. callback:回调对象
         */
        // 5、接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * <p>
             *   回调方法,当收到消息后,会自动执行该方法
             *      1. consumerTag:标识
             *      2. envelope:获取一些信息,交换机,路由key...
             *      3. properties:配置信息
             *      4. body:数据
             * </p>
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:" + consumerTag);
                System.out.println("Exchange:" + envelope.getExchange());
                System.out.println("RoutingKey:" + envelope.getRoutingKey());
                System.out.println("properties:" + properties);
                System.out.println("body:" + new String(body));
            }
        };
        channel.basicConsume(
                // 队列名称
                queue2Name,
                // 是否自动确认
                true,
                // 回调对象
                consumer
        );
        
        // 不需要关闭资源
    }
}


Routing 路由模式

生产者

package com.www.mq;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Routing 工作模式
 * <p>
 * 生产者:发送消息
 *
 * @author Www
 * @version 1.8
 * @since 2023/2/15  20:41 星期三
 */
public class ProducerRouting {
    
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2、设置参数
        // IP 默认值 localhost
        connectionFactory.setHost("192.168.36.100");
        // 端口 默认 5672
        connectionFactory.setPort(5672);
        // 虚拟机 默认值 /
        connectionFactory.setVirtualHost("/ljt");
        // 用户名 默认值 guest
        connectionFactory.setUsername("ljt");
        // 密码 默认值 guest
        connectionFactory.setPassword("ljt");
        
        
        // 3、创建连接 Connection : 受检异常——> 抛出
        Connection connection = connectionFactory.newConnection();
        
        // 4、创建通道 Channel
        Channel channel = connection.createChannel();
        
        // 5、创建交换机
        /*
           exchangeDeclare(
            String exchange,BuiltinExchangeType type,
            boolean durable, boolean autoDelete,
            boolean internal, Map<String, Object> arguments
            )
           参数:
            1. exchange:交换机名称
            2. type:交换机类型
                DIRECT("direct"),:定向
                FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
                TOPIC("topic"),通配符的方式
                HEADERS("headers");参数匹配
            3. durable:是否持久化
            4. autoDelete:自动删除
            5. internal:内部使用。 一般false
            6. arguments:参数
        */
        String exchangeName = "test_direct";
        channel.exchangeDeclare(
                // 交换机名称
                exchangeName,
                // type:交换机类型
                //   DIRECT("direct"),:定向
                //   FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
                //   TOPIC("topic"),通配符的方式
                //   HEADERS("headers");参数匹配
                BuiltinExchangeType.DIRECT,
                // 是否持久化
                true,
                // 内部使用
                false,
                // 参数
                null
        
        );
        
        // 6、创建队列
        String queue1Name = "test_direct_queue1";
        String queue2Name = "test_direct_queue2";
        channel.queueDeclare(
                // 队列名
                queue1Name,
                // 是否持久
                true,
                // 是否独占
                false,
                //是否自动删除
                false,
                // 参数
                null
        );
        channel.queueDeclare(
                // 队列名
                queue2Name,
                // 是否持久
                true,
                // 是否独占
                false,
                //是否自动删除
                false,
                // 参数
                null
        );
        
        // 7、绑定队列和交换机
         /*
            queueBind(String queue, String exchange, String routingKey)
            参数:
                1. queue:队列名称
                2. exchange:交换机名称
                3. routingKey:路由键,绑定规则
                    如果交换机的类型为fanout ,routingKey设置为""
         */
        // 队列1
        channel.queueBind(
                // 队列名
                queue1Name,
                // 交换机名
                exchangeName,
                //  routingKey:路由键,绑定规则
                //    如果交换机的类型为fanout ,routingKey设置为""
                "error"
        );
        // 队列2
        channel.queueBind(
                // 队列名
                queue2Name,
                // 交换机名
                exchangeName,
                //  routingKey:路由键,绑定规则
                //    如果交换机的类型为fanout ,routingKey设置为""
                "info"
        );
        channel.queueBind(
                // 队列名
                queue2Name,
                // 交换机名
                exchangeName,
                //  routingKey:路由键,绑定规则
                //    如果交换机的类型为fanout ,routingKey设置为""
                "error"
        );
        channel.queueBind(
                // 队列名
                queue2Name,
                // 交换机名
                exchangeName,
                //  routingKey:路由键,绑定规则
                //    如果交换机的类型为fanout ,routingKey设置为""
                "waring"
        );
        
         /*
             basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
            参数:
                1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
                2. routingKey:路由名称
                3. props:配置信息
                4. body:发送消息数据
         */
        // 发送消息内容
        String body = "日志信息:张三调用了delete方法...警告。。。日志级别:waring...";
        // 8、发送消息
        channel.basicPublish(
                // 交换机名称。简单模式下交换机会使用默认的 ""
                exchangeName,
                // 路由名称 :如果交换机的类型为fanout ,routingKey设置为""
                "waring",
                // 配置信息
                null,
                // 发送消息数据
                body.getBytes()
        );
        
        
        // 9、释放资源
        channel.close();
        connection.close();
        
    }
}


消费者 1

package com.www.mq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者:获取消息
 *
 * @author Www
 * @version 1.8
 * @since 2023/2/15  20:41 星期三
 */
public class ConsumerRouting1 {
    
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2、设置参数
        // IP 默认值 localhost
        connectionFactory.setHost("192.168.36.100");
        // 端口 默认 5672
        connectionFactory.setPort(5672);
        // 虚拟机 默认值 /
        connectionFactory.setVirtualHost("/ljt");
        // 用户名 默认值 guest
        connectionFactory.setUsername("ljt");
        // 密码 默认值 guest
        connectionFactory.setPassword("ljt");
        
        
        // 3、创建连接 Connection : 受检异常——> 抛出
        Connection connection = connectionFactory.newConnection();
        
        // 4、创建通道 Channel
        Channel channel = connection.createChannel();
        
        // 队列名
        String queue2Name = "test_direct_queue1";
         /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                1. queue:队列名称
                2. autoAck:是否自动确认
                3. callback:回调对象
         */
        // 5、接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * <p>
             *   回调方法,当收到消息后,会自动执行该方法
             *      1. consumerTag:标识
             *      2. envelope:获取一些信息,交换机,路由key...
             *      3. properties:配置信息
             *      4. body:数据
             * </p>
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:" + consumerTag);
                System.out.println("Exchange:" + envelope.getExchange());
                System.out.println("RoutingKey:" + envelope.getRoutingKey());
                System.out.println("properties:" + properties);
                System.out.println("body:" + new String(body));
            }
        };
        channel.basicConsume(
                // 队列名称
                queue2Name,
                // 是否自动确认
                true,
                // 回调对象
                consumer
        );
        
        // 不需要关闭资源
    }
}


消费者2

package com.www.mq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者:获取消息
 *
 * @author Www
 * @version 1.8
 * @since 2023/2/15  20:41 星期三
 */
public class ConsumerRouting2 {
    
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2、设置参数
        // IP 默认值 localhost
        connectionFactory.setHost("192.168.36.100");
        // 端口 默认 5672
        connectionFactory.setPort(5672);
        // 虚拟机 默认值 /
        connectionFactory.setVirtualHost("/ljt");
        // 用户名 默认值 guest
        connectionFactory.setUsername("ljt");
        // 密码 默认值 guest
        connectionFactory.setPassword("ljt");
        
        
        // 3、创建连接 Connection : 受检异常——> 抛出
        Connection connection = connectionFactory.newConnection();
        
        // 4、创建通道 Channel
        Channel channel = connection.createChannel();
        
        // 队列名
        String queue2Name = "test_direct_queue2";
         /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                1. queue:队列名称
                2. autoAck:是否自动确认
                3. callback:回调对象
         */
        // 5、接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * <p>
             *   回调方法,当收到消息后,会自动执行该方法
             *      1. consumerTag:标识
             *      2. envelope:获取一些信息,交换机,路由key...
             *      3. properties:配置信息
             *      4. body:数据
             * </p>
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:" + consumerTag);
                System.out.println("Exchange:" + envelope.getExchange());
                System.out.println("RoutingKey:" + envelope.getRoutingKey());
                System.out.println("properties:" + properties);
                System.out.println("body:" + new String(body));
            }
        };
        channel.basicConsume(
                // 队列名称
                queue2Name,
                // 是否自动确认
                true,
                // 回调对象
                consumer
        );
        
        // 不需要关闭资源
    }
}

Topics 通配符模式

生产者

package com.www.mq;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Topics 通配符工作模式
 * <p>
 * 生产者:发送消息
 *
 * @author Www
 * @version 1.8
 * @since 2023/2/15  20:41 星期三
 */
public class ProducerTopics {
    
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2、设置参数
        // IP 默认值 localhost
        connectionFactory.setHost("192.168.36.100");
        // 端口 默认 5672
        connectionFactory.setPort(5672);
        // 虚拟机 默认值 /
        connectionFactory.setVirtualHost("/ljt");
        // 用户名 默认值 guest
        connectionFactory.setUsername("ljt");
        // 密码 默认值 guest
        connectionFactory.setPassword("ljt");
        
        
        // 3、创建连接 Connection : 受检异常——> 抛出
        Connection connection = connectionFactory.newConnection();
        
        // 4、创建通道 Channel
        Channel channel = connection.createChannel();
        
        // 5、创建交换机
        /*
           exchangeDeclare(
            String exchange,BuiltinExchangeType type,
            boolean durable, boolean autoDelete,
            boolean internal, Map<String, Object> arguments
            )
           参数:
            1. exchange:交换机名称
            2. type:交换机类型
                DIRECT("direct"),:定向
                FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
                TOPIC("topic"),通配符的方式
                HEADERS("headers");参数匹配
            3. durable:是否持久化
            4. autoDelete:自动删除
            5. internal:内部使用。 一般false
            6. arguments:参数
        */
        // 交换机名称
        String exchangeName = "test_topic";
        channel.exchangeDeclare(
                // 交换机名称
                exchangeName,
                // type:交换机类型
                //   DIRECT("direct"),:定向
                //   FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
                //   TOPIC("topic"),通配符的方式
                //   HEADERS("headers");参数匹配
                BuiltinExchangeType.TOPIC,
                // 是否持久化
                true,
                // 内部使用
                false,
                // 参数
                null
        
        );
        
        // 6、创建队列
        String queue1Name = "test_topic_queue1";
        String queue2Name = "test_topic_queue2";
        channel.queueDeclare(
                // 队列名
                queue1Name,
                // 是否持久
                true,
                // 是否独占
                false,
                //是否自动删除
                false,
                // 参数
                null
        );
        channel.queueDeclare(
                // 队列名
                queue2Name,
                // 是否持久
                true,
                // 是否独占
                false,
                //是否自动删除
                false,
                // 参数
                null
        );
        
        // 7、绑定队列和交换机
         /*
            queueBind(String queue, String exchange, String routingKey)
            参数:
                1. queue:队列名称
                2. exchange:交换机名称
                3. routingKey:路由键,绑定规则
                    如果交换机的类型为fanout ,routingKey设置为""
         */
        // routing key  系统的名称.日志的级别。
        // * :表示一个单词
        // # :表示0或多个单词
        // 需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库
        // 队列1
        channel.queueBind(
                // 队列名
                queue1Name,
                // 交换机名
                exchangeName,
                //  routingKey:路由键,绑定规则
                //    如果交换机的类型为fanout ,routingKey设置为""
                "#.error"
        );
        channel.queueBind(
                // 队列名
                queue1Name,
                // 交换机名
                exchangeName,
                //  routingKey:路由键,绑定规则
                //    如果交换机的类型为fanout ,routingKey设置为""
                "order.*"
        );
        // 队列2
        channel.queueBind(
                // 队列名
                queue2Name,
                // 交换机名
                exchangeName,
                //  routingKey:路由键,绑定规则
                //    如果交换机的类型为fanout ,routingKey设置为""
                "*.*"
        );
    
        
         /*
             basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
            参数:
                1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
                2. routingKey:路由名称
                3. props:配置信息
                4. body:发送消息数据
         */
        // 发送消息内容
        String body = "日志信息:张三调用了findAll方法...日志级别:info...";
        // 8、发送消息
        channel.basicPublish(
                // 交换机名称。简单模式下交换机会使用默认的 ""
                exchangeName,
                // 路由名称 :如果交换机的类型为fanout ,routingKey设置为""
                "order.info",
                // 配置信息
                null,
                // 发送消息数据
                body.getBytes()
        );
        
        
        // 9、释放资源
        channel.close();
        connection.close();
        
    }
}

消费者1

package com.www.mq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者:获取消息
 *
 * @author Www
 * @version 1.8
 * @since 2023/2/15  20:41 星期三
 */
public class ConsumerTopic1 {
    
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2、设置参数
        // IP 默认值 localhost
        connectionFactory.setHost("192.168.36.100");
        // 端口 默认 5672
        connectionFactory.setPort(5672);
        // 虚拟机 默认值 /
        connectionFactory.setVirtualHost("/ljt");
        // 用户名 默认值 guest
        connectionFactory.setUsername("ljt");
        // 密码 默认值 guest
        connectionFactory.setPassword("ljt");
        
        
        // 3、创建连接 Connection : 受检异常——> 抛出
        Connection connection = connectionFactory.newConnection();
        
        // 4、创建通道 Channel
        Channel channel = connection.createChannel();
        
        // 队列名
        String queue2Name = "test_topic_queue1";
         /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                1. queue:队列名称
                2. autoAck:是否自动确认
                3. callback:回调对象
         */
        // 5、接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * <p>
             *   回调方法,当收到消息后,会自动执行该方法
             *      1. consumerTag:标识
             *      2. envelope:获取一些信息,交换机,路由key...
             *      3. properties:配置信息
             *      4. body:数据
             * </p>
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:" + consumerTag);
                System.out.println("Exchange:" + envelope.getExchange());
                System.out.println("RoutingKey:" + envelope.getRoutingKey());
                System.out.println("properties:" + properties);
                System.out.println("body:" + new String(body));
            }
        };
        channel.basicConsume(
                // 队列名称
                queue2Name,
                // 是否自动确认
                true,
                // 回调对象
                consumer
        );
        
        // 不需要关闭资源
    }
}

消费者2

package com.www.mq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者:获取消息
 *
 * @author Www
 * @version 1.8
 * @since 2023/2/15  20:41 星期三
 */
public class ConsumerTopic2 {
    
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2、设置参数
        // IP 默认值 localhost
        connectionFactory.setHost("192.168.36.100");
        // 端口 默认 5672
        connectionFactory.setPort(5672);
        // 虚拟机 默认值 /
        connectionFactory.setVirtualHost("/ljt");
        // 用户名 默认值 guest
        connectionFactory.setUsername("ljt");
        // 密码 默认值 guest
        connectionFactory.setPassword("ljt");
        
        
        // 3、创建连接 Connection : 受检异常——> 抛出
        Connection connection = connectionFactory.newConnection();
        
        // 4、创建通道 Channel
        Channel channel = connection.createChannel();
        
        // 队列名
        String queue2Name = "test_topic_queue2";
         /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                1. queue:队列名称
                2. autoAck:是否自动确认
                3. callback:回调对象
         */
        // 5、接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * <p>
             *   回调方法,当收到消息后,会自动执行该方法
             *      1. consumerTag:标识
             *      2. envelope:获取一些信息,交换机,路由key...
             *      3. properties:配置信息
             *      4. body:数据
             * </p>
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:" + consumerTag);
                System.out.println("Exchange:" + envelope.getExchange());
                System.out.println("RoutingKey:" + envelope.getRoutingKey());
                System.out.println("properties:" + properties);
                System.out.println("body:" + new String(body));
            }
        };
        channel.basicConsume(
                // 队列名称
                queue2Name,
                // 是否自动确认
                true,
                // 回调对象
                consumer
        );
        
        // 不需要关闭资源
    }
}

有关RabbitMQ的更多相关文章

  1. 绝对详细的 RabbitMQ 实践操作手册(一) - 2

    绝对详细的RabbitMQ实践操作手册,看完本系列就够了。一、什么是MQ?1、MQ的概念2、理解消息队列二、MQ的优势和劣势1、优势和作用2、劣势三、MQ的应用场景四、AMQP五、工作原理一、什么是MQ?1、MQ的概念MQ全称MessageQueue(消息队列),是在消息的传输过程中保存消息的容器。多用于系统之间的异步通信。下面用图来理解异步通信,并阐明与同步通信的区别。同步通信:甲乙两人面对面交流,你一句我一句必须同步进行,两人除此之外不做任何事情异步通信:异步通信相当于通过第三方转述对话,可能有消息的延迟,但不需要二人时刻保持联系,消息传给第三方后,两人可以做其他自己想做的事情,当需要获取

  2. javascript - RabbitMQ + Web Stomp 和安全性 - 2

    RabbitMQ+WebStomp很棒。但是,我有一些主题我希望安全为只读或只写。似乎保护这些的唯一机制是使用rabbitmqctl。我可以创建一个虚拟主机,一个用户,然后应用一些权限。然而,这就是Stomp和Rabbit实现开始崩溃的地方。主题采用以下形式:stomp中的/topic/blah,它使用路由键“blah”路由到Rabbit中的“amq.topic”。似乎没有办法为路由key设置权限。似乎:rabbitmqctlset_permissions-pvhostuser".*"".*""^amq\.topic"是我能做的最好的,这仍然是“所有”主题。我也研究过交换,但没有办法在

  3. RabbitMQ如何实现延迟队列 - 2

    1.延迟队列延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。很可惜,在RabbitMQ中并未提供延迟队列功能,但是我们有其他的方式可以实现延迟队列,方法就是TTL+死信队列,组合实现延迟队列的效果。2.什么是TTLTTL,全称TimeToLive,消息过期时间设置。消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。队列过期后,会将队列所有消息全部移除。一个队列中某一个消息过期后,只有消息在队列顶端,才会判断

  4. javascript - 如何在没有nodejs的情况下使用javascript连接到rabbitmq - 2

    我正在使用Titaniumappcelerator实现移动应用程序。在此应用中,我需要使用RabbitMQ实现聊天。我看到了很多关于如何使用节点js连接到RabbitMQ服务器的示例,但在我的例子中它不适用,我找不到任何JavaScript独立客户端。我想知道是否有一种方法可以在没有客户端的情况下连接到RabbitMQ,或者是否有任何JavaScript独立客户端? 最佳答案 是RabbitMQWebSTOMP吗?你在找什么?使用SockJS,即使在旧浏览器中也能正常工作,不需要Node.js或您编写的任何网络服务器端代码。

  5. RabbitMQ安装(发生系统错误5。拒绝访问)解决方案 - 2

    一、先检查一下Eralng是否安装好并配置好环境 新建系统变量名为:ERLANG_HOME变量值为erlang安装地址 双击系统变量path,点击“新建”,将%ERLANG_HOME%\bin加入到path中。检查是否安装好:二、1、管理员运行cmd然后打开RabbitMQ安装目录\sbin2、rabbitmq-service.batremove3、setRABBITMQ_BASE=D:\rabbitmq_server\data(D:\rabbitmq_server\data是我自己自定义的目录,这样就绕过c\user\用中文用户名…这个文件夹,这样就不会有中文路径的问题了)4、rabbitm

  6. go - Go 中的 RabbitMQ : machinery worker cannot to connect to 0. 0.0.0:5672 消息 getsockopt: 连接被拒绝 - 2

    RabbitMQ在wifi网络中运行良好,我在运行ArchLinux的笔记本电脑中设置了RabbitMQ。但是,当我在不同的wifi网络中使用同一台笔记本电脑时,出现以下错误:machinery:worker.go:42:Goingtoretrylaunchingtheworker.Error:Dial:dialtcp0.0.0.0:5672:getsockopt:connectionrefusedmachinery:retry.go:20:Retryinginxseconds我不知道为什么RabbitMQ在一个网络中工作而不在另一个网络中工作。我尝试更改/etc/rabbitmq/r

  7. go - 如何检查 channel 是否仍在 streadway/amqp RabbitMQ 客户端中工作? - 2

    这个问题在这里已经有了答案:HowtodetectdeadRabbitMQconnection?(4个答案)关闭9个月前。我正在使用github.com/streadway/amqp对于我的程序。在重新初始化之前,我应该如何确保用于消费和/或生产的channel仍在工作?例如,在ruby​​中,我可以简单地做:bunny_client=Bunny.new({....})bunny_client.start启动客户端,ifnotbunny_clientorbunny_client.status!=:connected#re-initializetheclient如何使用streadway

  8. go - golang 消费者端超时 rabbitmq - 2

    我正在使用rabbitmq和golang,我在comsumer端创建了一个等待消息的线程,例如rabbitmq示例。我想主线程将等待kill信号并将信号发送到rabbitmq线程,但如果消息未推送到队列中,rabbitmq线程将永远等待,因此它不会处理kill信号消息。我发现rabbitmq在comsumer中有超时,但我刚刚在java中找到了代码,而不是golang。你能帮助我吗?谢谢。 最佳答案 如果我对您的问题的理解正确,您希望能够退出例行处理队列?超时设置在这里不起作用。这是针对服务器/客户端心跳的,只有在任何时候都没有收到

  9. google-app-engine - 谷歌云引擎 : PubSub instead of RabbitMQ - 2

    我的项目采用在GoogleCloud中运行的微服务器架构。我正在考虑从使用RabbitMQ的容器迁移到PubSub引擎。问题是:是否可以一条一条的接收消息?我的代码是用Go和docs编写的说Thecallbackisinvokedconcurrentlybymultiplegoroutines,maximizingthroughput.但是可以调用多少个goroutines呢?如何设置允许的最大值?例如。我的一名工作人员使用第三方API,每个IP仅允许一个连接,因此我只能及时为该工作人员执行一项任务。 最佳答案 正确的解决方案是Ap

  10. ssl - RabbitMQ TLS tls_connection :format_status/2 crashed - 2

    我正在尝试使用Go建立一个简单的TLS连接,RabbitMQ在尝试创建启用了TLS的连接(Go客户端)时报告了这个问题:rabbitmq_1|2018-04-1613:37:54.146[error]**Statemachineterminatingrabbitmq_1|**Lastevent={{call,{,#Ref}},{new_user,}}rabbitmq_1|**Whenserverstate={error,"tls_connection:format_status/2crashed"}rabbitmq_1|**Reasonfortermination=error:func

随机推荐