草庐IT

springboot集成rabbitMQ-消费者

被代码耽误的段子手 2023-07-24 原文

作为rabbitMQ的生产者,发送消息到MQ的过程中,是通过routingkey发送给交换机,由交换机进行路由,把信息发送的最终的队列中。而rabbitMQ消费的时候,是要明确指明消费的队列的。

消费模式

rabbitMQ的消费模式分为两种,推模式和拉模式。推模式使用的是Basic.Consume 进行消费,而拉模式通过调用Basic.Get进行消费。推模式用于持续的获取消息,在推模式中,RabbitMQ会不断的推送消息给消费者,不过推送的数量可以通过Basic.Qos进行限制。拉模式可以单条的获取信息。

消费端的确认和拒绝

为了保证消息可以从队列可靠的到达消费者,RabbitMQ提供了消息确认机制。消费者在订阅队列时,可以指定autoAck参数。

  • 如果autoAck参数为true,RabbitMQ会自动把发送出去的消息置为确认,并且从内存(磁盘)中删除,不关消费者是否真正进行了正确消费。
  • 如果autoAck参数设置为false,RabbitMQ会等待消费者显式地回复确认信号后才从内存或者磁盘中删除。

采用消息确认机制后,只要设置autoAck为false,消费者就有足够的时间处理消息,不用担心处理消息过程中,消费者进程断掉导致消息丢失的问题。RabbitMQ会一直等到消费者显式调用Basic.ack命令为止。

Springboot使用amqp进行消费

在springboot中,对消息进行消费有两种方式。

  • 轮询:使用rabbitMQTemplate.receive()等相关方法进行消费,每次消费一条。
  • 注册侦听器:这个方式也是更为灵活,更为常用的。通过@Bean设置监听器端点和@RabbitListener注解的方式实现。

代码演示

 @Test
    public void receive() throws UnsupportedEncodingException {
        Message receive = rabbitTemplate.receive("queue-msg”);//指定队列名称
        System.out.println(new String(receive.getBody(),"utf-8"));
        Object o = rabbitTemplate.receiveAndConvert("queue-msg”);//可以支持类型转换
        System.out.println(o);
    }


这是最简单的消费方式,但是可以看到,虽然简单,功能也很少。
那么更常用的是使用配置监听器的方式

@Component
public class RabbitMQReceiver {

    @RabbitListener(queues = "test.topic",ackMode = "MANUAL")
    @RabbitHandler()
    public void receive(String msg,Channel channel, Message message) throws IOException, InterruptedException {
        System.out.println("接收到消息:RabbitMQReceiver"+message);
        Thread.sleep(2000);
        channel.basicQos(2);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    }


    @RabbitListener(queues = "test.topic",ackMode = "MANUAL")
    @RabbitHandler()
    public void receive2(String msg,Channel channel, Message message) throws IOException, InterruptedException {
        System.out.println("wqerdf:RabbitMQReceiver"+message);
        Thread.sleep(2000);
        channel.basicQos(2);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    }
}
/**
这里通过RabbitListener注解指定了队列名称,看到queues应该已经想到了,可以指定多个; ackMode指定了消息需要人为的确认
需要注意的是,如果多个消费者方法,每个方法都要有一个RabbitListener
*/

有关springboot集成rabbitMQ-消费者的更多相关文章

  1. ruby-on-rails - 如何使辅助方法在 Rails 集成测试中可用? - 2

    我在app/helpers/sessions_helper.rb中有一个帮助程序文件,其中包含一个方法my_preference,它返回当前登录用户的首选项。我想在集成测试中访问该方法。例如,这样我就可以在测试中使用getuser_path(my_preference)。在其他帖子中,我读到这可以通过在测试文件中包含requiresessions_helper来实现,但我仍然收到错误NameError:undefinedlocalvariableormethod'my_preference'.我做错了什么?require'test_helper'require'sessions_hel

  2. ruby-on-rails - 我如何将 Hoptoad 与 DelayedJob 和 DaemonSpawn 集成? - 2

    我一直很高兴地使用DelayedJob习惯用法:foo.send_later(:bar)这会调用DelayedJob进程中对象foo的方法bar。我一直在使用DaemonSpawn在我的服务器上启动DelayedJob进程。但是...如果foo抛出异常,Hoptoad不会捕获它。这是任何这些包中的错误...还是我需要更改某些配置...或者我是否需要在DS或DJ中插入一些异常处理来调用Hoptoad通知程序?回应下面的第一条评论。classDelayedJobWorker 最佳答案 尝试monkeypatchingDelayed::W

  3. jenkins部署1--jenkins+gitee持续集成 - 2

    前置步骤我们都操作完了,这篇开始介绍jenkins的集成。话不多说,看操作1、登录进入jenkins后会让你选择安装插件,选择第一个默认的就行。安装完成后设置账号密码,重新登录。2、配置JDK和Git都需要执行路径,所以需要先把执行路径找到,先进入服务器的docker容器,2.1JDK的路径root@69eef9ee86cf:/usr/bin#echo$JAVA_HOME/usr/local/openjdk-82.2Git的路径root@69eef9ee86cf:/#whichgit/usr/bin/git3、先配置JDK和Git。点击:ManageJenkins>>GlobalToolCon

  4. 三分钟集成 TapTap 防沉迷 SDK(Unity 版) - 2

    三分钟集成Tap防沉迷SDK(Unity版)一、SDK介绍基于国家对上线所有游戏必须增加防沉迷功能的政策下,TapTap推出防沉迷SDK,供游戏开发者进行接入;允许未成年用户在周五、六、日以及法定节假日晚上8:00-9:00进行游戏,防沉谜时间段进入游戏会弹窗进行提示!开发环境要求:Unity2019.4或更高版本iOS10或更高版本Android5.0(APIlevel21)或更高版本🔗Unity集成Demo参考链接🔗UnityTapSDK功能体验APK下载链接二、集成前准备1.创建应用进入开发者后台,按照提示开始创建应用;2.开通服务在使用TDS实名认证和防沉迷服务之前,需要在上面创建的应

  5. ruby-on-rails - RailsTutorial - 第 8.4.3 章 - 在集成测试中添加用户后未清除测试数据库 - 2

    我被这个难住了。到目前为止教程中的一切都进行得很顺利,但是当我将这段代码添加到我的/spec/requests/users_spec.rb文件中时,事情开始变得糟糕:describe"success"doit"shouldmakeanewuser"dolambdadovisitsignup_pathfill_in"Name",:with=>"ExampleUser"fill_in"Email",:with=>"ryan@example.com"fill_in"Password",:with=>"foobar"fill_in"Confirmation",:with=>"foobar"cl

  6. ruby - 在 Ruby 中实现生产者消费者模式 - 2

    假设我有200个昂贵的方法调用(每个都有不同的参数)。出于某种原因,我可以并行执行其中的5个调用,但不能更多。我可以一次执行一个,但一次执行5个要快5倍。我想一直执行五件事。不想排五个,等五个都排完了,再排五个。如果我排队A、B、C、D、E并且C先完成,我想立即用F替换它,即使A和B还没有完成。我一直在研究这个问题,因为我可以想象它会定期发生。解决方案似乎是生产者-消费者模式,Ruby在其标准库中内置了一些用于该模式的结构(Queue和SizedQueue)。我玩过代码示例,阅读了一些文档,我想我对它有一个粗略的了解。但是我有一些问题我对我的解决方案没有信心,而且多线程的整个领域对我来

  7. ruby-on-rails - 将 Angular JS 与 Rails 集成 - 2

    我需要一些指导来了解如何将Angular整合到rails中。选择Rails的原因:我喜欢他们偏执的做事方式。还有迁移,gem真的很酷。使用angular的原因:我正在研究和寻找最适合SPA的框架。Backbone似乎太抽象了。我不得不在Angular和Ember之间做出选择。我首先开始阅读Angular,它对我来说很有意义。所以我从来没有去读过关于ember的文章。使用Angular和Rails的原因:我研究并尝试使用小型框架,例如grape、slim(是的,我也使用php)。但我觉得需要坚持项目的长期范围。我个人喜欢用Rails的方式做事。这就是我需要帮助的地方,我在Rails4中有

  8. ruby - 在 Maven 集成中运行 Ruby 单元测试 - 2

    有没有人有在Maven中运行用Ruby编写的单元测试的经验。任何输入,如要使用的库/maven插件,将不胜感激!我们已经在使用Maven+hudson+Junit。但是我们正在引入Ruby单元测试,找不到任何同样好的组合。 最佳答案 我建议让Maven使用ExecMavenPlugin启动rake测试(exec:exec目标)并使用ci_reportergem生成单元测试结果的XML文件,Hudson、Bamboo等可以读取该文件,以与JUnit测试相同的格式显示测试结果。如果您不需要使用mvntest运行Ruby测试,您也可以只使

  9. ruby - 使用 Gatling 作为集成测试工具 - 2

    目前我有一小套针对我的网络服务器运行的集成测试,它发出请求并断言一些关于响应应该是什么的假设。这些是用Ruby编写的,生成http请求。我一直在看Gatling作为压力测试工具,但我想知道它是否也可以用于集成测试。这样,所有端点请求都可以在压力测试和集成测试中重复使用。我可能在这里失去了一些东西,因为没有RSpec的BDD,但不必两次创建相同的测试。有没有人有这样使用gatling的经验? 最佳答案 您可以使用AssertionAPI并设置验收标准。但是,Gatling不是浏览器,不会运行/测试您的Javascript,因此这种方法

  10. Spring Boot集成ElasticSearach - 2

    文章目录前言一、Elasticsearch版本介绍二、客户端种类三、客户端与版本兼容性四、引入Elasticsearch依赖包五、客户端配置六、Elasticsearch使用前言ElasticSearch是Elastic公司出品的一款功能强大的搜索引擎,被广泛的应用于各大IT公司,它的代码位于https://github.com/elastic/elasticsearch,目前是一个开源项目。ElasticSearch公司的另外两个开源产品Logstash、Kibana与ElasticSearch构成了著名的ELK技术栈。。他们三个共同形成了一个强大的生态圈。简单地说,Logstash负责数据

随机推荐