草庐IT

SpringCloud Stream整合RocketMQ

无我_无他_有你 2023-03-28 原文

前言

1.rocketmq 安装可参考:https://www.jianshu.com/p/f3713adfa3dd
2.启动好nameserv 和 broker
3.官方RocketMQ+springcloud stream 例子 https://github.com/alibaba/spring-cloud-alibaba/blob/2021.x/spring-cloud-alibaba-examples/rocketmq-example/readme-zh.md

  1. 本文将说明普通消息发送/消费、广播消息发送/消费、延时消息发送消费三种模式

项目环境/依赖:

    <properties>
        <spring-boot-version>2.3.12.RELEASE</spring-boot-version>
        <spring-cloud-version>Hoxton.SR12</spring-cloud-version>
        <spring-cloud-alibaba-version>2.2.7.RELEASE</spring-cloud-alibaba-version>
        <rocketmq.version>2021.1</rocketmq.version>
    </properties>
    !-- Environment START-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-dependencies</artifactId>
        <version>${spring-boot-version}</version>
        <type>pom</type>
        <scope>import</scope>
    </dependency>
    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-alibaba-dependencies</artifactId>
        <version>${spring-cloud-alibaba-version}</version>
        <type>pom</type>
        <scope>import</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-dependencies</artifactId>
        <version>${spring-cloud-version}</version>
        <type>pom</type>
        <scope>import</scope>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.alibaba.cloud/spring-cloud-starter-stream-rocketmq -->
    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
        <version>${rocketmq.version}</version>
        <exclusions>
            <exclusion>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-client</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-acl</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.9.4</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-acl -->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-acl</artifactId>
        <version>4.9.4</version>
    </dependency>

依赖说明:spring-cloud-starter-stream-rocketmq 排除了rocketmq-client、rocketmq-acl依赖是因为我想换成新一点的依赖,不排除也是可以的。

1.普通消息发送

新建模块A用于消息发送
创建一个controller用户测试消息发送

@RestController
public class RocketMqSendMsgController {

    @Autowired
    private StreamBridge streamBridge;

    @PostMapping(value = "/cluster")
    public void sendClusterMsg(@RequestParam("message") String message) {
        Message<BaseMessage<String>> msg = new GenericMessage<>(new BaseMessage<>(CLUSTER_MESSAGE_OUTPUT,"",message));
        boolean result = streamBridge.send(CLUSTER_MESSAGE_OUTPUT, msg);
        System.out.println(Thread.currentThread().getName() + " 消息集群发送: " + msg.getPayload().getData());
    }
}

yml配置

server:
  port: 10004
spring:
  application:
    name: search-server
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: localhost:9876
      bindings:
        cluster-out-0:
          destination: cluster

配置说明
1.配置name-server服务地址,必须要配置
2.cluster-out-0 :channel 通道名称 默认的一个规则吧 发送消息就是 -out- 这样子

  1. destination: cluster :topic为cluster

附上代码中用到的常量类

package com.ly.tuliy.commons.base.mq;

/**
 * 类说明: mq 常量类
 *
 * @author wqf
 * @date 2022/9/7 9:30
 */
public class MessageConstant {

    //生产者-集群消息主题
    public static String CLUSTER_MESSAGE_OUTPUT="cluster-out-0";
    //生产者-广播消息主题
    public static String BROADCAST_MESSAGE_OUTPUT="broadcast-out-0";
    //生产者-延时消息主题
    public static String DELAYED_MESSAGE_OUTPUT="delayed-out-0";


    //消费者-集群消息主题
    public static String CLUSTER_MESSAGE_INPUT="cluster-in-0";
    //消费者-广播消息主题
    public static String BROADCAST_MESSAGE_INPUT="broadcast-in-0";
    //消费者-延时消息主题
    public static String DELAYED_MESSAGE_INPUT="delayed-in-0";

}

import java.io.Serializable;
import java.util.Map;

/**
 * @Author: wqf
 * @Date: 2022/09/09
 * @Description: mq 发送消息的内容体基础内容
 */
@ToString
public class BaseMessage<T> implements Serializable {
    /**
     * 消息主题
     */
    private String topic;
    /**
     * 消息标签
     */
    private String tag;
    /**
     * 消息内容
     */
    private T data;
    /**
     *
     */
    private Map<String, Object> header;


    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public String getTag() {
        return tag;
    }

    public void setTag(String tag) {
        this.tag = tag;
    }

    public T getData() {
        return data;
    }

    public void setData(T data) {
        this.data = data;
    }

    public Map<String, Object> getHeader() {
        return header;
    }

    public void setHeader(Map<String, Object> header) {
        this.header = header;
    }

    public BaseMessage(String topic, String tag, T data, Map<String, Object> header) {
        this.topic = topic;
        this.tag = tag;
        this.data = data;
        this.header = header;
    }

    public BaseMessage(String topic, String tag, T data) {
        this.topic = topic;
        this.tag = tag;
        this.data = data;
    }

    public BaseMessage(String topic,  T data) {
        this.topic = topic;
        this.data = data;
    }

    public BaseMessage() {
    }
}

新建模块B用于消息消费
创建一个类接收消息

/**
 * @Author: wqf
 * @Date: 2022/09/09
 * @Description:
 */
@RestController
public class RocketMqReceiveMsgController {

    @Autowired
    private StreamBridge streamBridge;

    /**
     * 函数式编辑接收消息
     */
    @Bean
    public Consumer<String> cluster() {
        return message -> {
            System.out.println("接收的集群消息为:" + message);
        };
    }

yml配置

server:
  port: 10005 #${random.int[10000,19999]} # 随机端口,方便启动多个消费者
spring:
  application:
    name: seckill-server
  cloud:
    stream:
      function:
        #消费者端配置
        definition: cluster
      rocketmq:
        binder:
          name-server: localhost:9876
      bindings:
        cluster-in-0:
          destination: cluster
          group: cluster-group

配置说明:
1.definition: cluster 消费者端配置,这里配置的cluster 必须和我们接收消息类中的方法名称一致


image.png

2.cluster-in-0:也是默认的规则 -in- 标识接收消息
3.group:消费组名称配置 ,这个一定要配,名称命名没有要求

测试:
用postman在生产者端(A)发送消息,消费端(B)能正常接收到消息。将消费端B多启动几个端口,创建多消费者环境,此时我们发送消息可以观测到消息将随即被几个消费者消费,一个消息只会被消费一次

出现的问题: 消息接收不到或者是报错,请先检查下主题是否创建(rocketmq 控制台看看),或者启动broker时修改配置为自动创建主题。

2.广播消息发送

生产者(A)controller添加测试接口

    @PostMapping(value = "/broadcast")
    public void sendBroadcastMsg(@RequestParam("message") String message) {
        Message<BaseMessage<String>> msg = new GenericMessage<>(new BaseMessage<>(BROADCAST_MESSAGE_OUTPUT,"",message));
        boolean result = streamBridge.send(BROADCAST_MESSAGE_OUTPUT, msg);
        System.out.println(Thread.currentThread().getName() + " 消息广播发送: " + msg.getPayload().getData());
    }

消费者端(B)添加以下配置

    /**
     * 函数式编辑接收消息
     */
    @Bean
    public Consumer<String> broadcast() {
        return message -> {
            System.out.println("接收的广播消息为:" + message);
        };
    }
server:
  port: 10005 #${random.int[10000,19999]} # 随机端口,方便启动多个消费者
spring:
  application:
    name: seckill-server
  cloud:
    stream:
      function:
        #消费者端配置
        definition: cluster;broadcast
      rocketmq:
        binder:
          name-server: localhost:9876
        bindings:
          broadcast-in-0:
            consumer:
              #配置是否开启广播消息 默认为false
              broadcasting: true
      bindings:
        cluster-in-0:
          destination: cluster
          group: cluster-group
        broadcast-in-0:
          destination: broadcast
          group: broadcast-group

配置说明:
1.consumer.broadcasting: true 该配置默认是false,true表示开启广播消费

测试:
启动多个消费者,发送消息时,每个消费者都能接收到每条生产者的消息

3.延时消息发送

生产者(A)controller添加测试接口

    @PostMapping(value = "/delayed")
    public void sendDelayedMsg(@RequestParam("message") String message) {
        String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

        for (int i = 0; i < 100; i++) {
            String key = "KEY" + i;
            Map<String, Object> headers = new HashMap<>();
            headers.put(MessageConst.PROPERTY_KEYS, key);
            headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
            // 设置延时等级1~10
            headers.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 4);
            BaseMessage<String> baseMessage = new BaseMessage<>(MessageConstant.DELAYED_MESSAGE_OUTPUT, message);
            baseMessage.setHeader(headers);
            Message<BaseMessage<String>> msg = new GenericMessage<>(baseMessage, headers);
            streamBridge.send(MessageConstant.DELAYED_MESSAGE_OUTPUT, msg);
            System.out.println(Thread.currentThread().getName() + " 延时消息: " + msg.getPayload().getData());
        }
    }

参数说明:
messageDelayLevel :延时有18个等级(我试了前4个等级),每个等级延时时间如代码

yml添加配置

server:
  port: 10004
spring:
  application:
    name: search-server
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: localhost:9876
        bindings:
          delayed-out-0:
            producer:
              group: delayed-group
              sync: true
      bindings:
        cluster-out-0:
          destination: cluster
        broadcast-out-0:
          destination: broadcast
        delayed-out-0:
          destination: delayed

配置说明:
bindings.delayed-out-0.producer.sync=true 该项配置只在生产端配置,表示消息发送通道delayed-out-0开启消息异步发送,一定要有,不然延时消息没效果

消费者端(B)添加以下配置

    /**
     * 函数式编辑接收消息
     */
    @Bean
    public Consumer<String> delayed() {
        return message -> {
            System.out.println("接收的延时消息为:" + message);
        };
    }
server:
  port: 10005 #${random.int[10000,19999]} # 随机端口,方便启动多个消费者
spring:
  application:
    name: seckill-server
  cloud:
    stream:
      function:
        #消费者端配置
        definition: cluster;broadcast;delayed
      rocketmq:
        binder:
          name-server: localhost:9876
        bindings:
          broadcast-in-0:
            consumer:
              #配置是否开启广播消息 默认为false
              broadcasting: true
      bindings:
        cluster-in-0:
          destination: cluster
          group: cluster-group
        broadcast-in-0:
          destination: broadcast
          group: broadcast-group
        delayed-in-0:
          destination: delayed
          group: delayed-group

有关SpringCloud Stream整合RocketMQ的更多相关文章

  1. ruby - 如何将 Interactive Ruby 整合到我的开发过程中? - 2

    我正在尝试找到一种更好的方法将IRB与我的常规ruby​​开发集成。目前我很少在我的代码中使用IRB。我只用它来验证语法或尝试一些小的东西。我知道我可以将我自己的代码加载到ruby​​中作为一个require'mycode'但这通常不符合我的编程风格。有时我要检查的变量超出范围或在循环内。有没有一种简单的方法可以启动我的脚本并在IRB内的某个点卡住?我想我正在寻找一种更简单的方法来调试我的ruby​​代码而不破坏我的F5(编译)键。也许有经验的ruby开发者可以和我分享一个更精简的开发方法。 最佳答案 安装ruby​​-debugg

  2. ruby - 使用 Drupal 和 Ruby。有没有人整合两者? - 2

    我开始了一个小型网络项目并使用Drupal来构建它。到目前为止,还不错:您可以快速建立一个不错的面向CMS的网站,通过模块添加社交功能,并且您有一个广泛的API可以在一个架构良好的平台中进行自定义。现在问题来了:网站的增长超出了最初的计划,我发现自己正处于认真开始为它编写代码的境地。由于Drupal项目,我对PHP有了新的认识,但我想用Ruby来做。我会感觉更舒服,以后维护起来更容易,我可以在其他Ruby/Rails应用程序中重用它。随着时间的推移,我想我会用Ruby重写Drupal中的现有部分。基于此,问题是:是否有人将两者(成功或失败的故事)结合起来?这是一个相当大的决定,但我在G

  3. 优化大数据量查询方案——SpringBoot(Cloud)整合ES - 2

    一、Elasticsearch简介实际业务场景中,多端的查询功能都有很大的优化空间。常见的处理方式有:建索引、建物化视图简化查询逻辑、DB层之上建立缓存、分页…然而随着业务数据量的不断增多,总有那么一张表或一个业务,是无法通过常规的处理方式来缩短查询时间的。在查询功能优化上,作为开发人员应该站在公司的角度,本着优化客户体验的目的去寻找解决方案。本人有幸做过Tomcat整合solr,今天一起研究一下当前比较火热的Elasticsearch搜索引擎。Elasticsearch是一个非常强大的搜索引擎。它目前被广泛地使用于各个IT公司。Elasticsearch是由Elastic公司创建。它的代码位

  4. 若依整合Easy-Es实现文章列表分页查询 - 2

    Easy-Es(简称EE)是一款基于ElasticSearch(简称Es)官方提供的RestHighLevelClient打造的ORM开发框架,在RestHighLevelClient的基础上,只做增强不做改变,为简化开发、提高效率而生,您如果有用过Mybatis-Plus(简称MP),那么您基本可以零学习成本直接上手EE,EE是MP的Es平替版,在有些方面甚至比MP更简单,同时也融入了更多Es独有的功能,助力您快速实现各种场景的开发。目录1、ES的优点2、整合过程(1)配置文件(2

  5. springboot2.6.4整合elasticsearch7.15.2 - 2

    之前跟着老师的视频安装了一个es6.4.3,视频里说“6->7类型逐渐被废弃,版本变化较大,与springboot不兼容,建议安装6.4.3与内部一致”,我就很天真的跟着一块装了,结果可想而知……我的springboot版本是2.6.4啊,怎么可能不报错呢??!还是要多看官方文件!整合最重要的就是这张图,版本对了什么都好说es对应的springboot版本SpringDataReleaseTrainSpringDataElasticsearchElasticsearchSpringFrameworkSpringBoot2021.2(Raj)4.4.x7.17.45.3.x2.7.x2021.1

  6. 『赠书活动 | 第一期』《分布式中间件核心原理与RocketMQ最佳实践》 - 2

    💗wei_shuo的个人主页💫wei_shuo的学习社区🌐HelloWorld!『赠书活动|第一期』本期书籍:《分布式中间件核心原理与RocketMQ最佳实践》赠书规则:评论区:点赞|收藏|留言评论区留言:"人生苦短,我用Java"活动截止时间:4月23日赠书数量:1Tip:中奖后博主私信通知|三天内不回复将视为|自动放弃书籍介绍分布式中间件核心原理与RocketMQ实战技术一本通:实战案例+操作步骤+执行效果图,手把手教你吃透分布式中间件技术,轻松实现从小白到大牛的职业跃迁!分布式中间件核心原理与RocketMQ实战技术必修宝典!内容简介本书从分布式系统的基础概念讲起,逐步深入分布式系统中间

  7. Nginx生产环境配置、elasticsearch生产环境配置、rocketmq生产环境配置 (史上最全) - 2

    Nginx实现10万+并发在优化内核时,可以做的事情很多,不过,我们通常会根据业务特点来进行调整,当Nginx作为静态web内容服务器、反向代理或者提供压缩服务器的服务器时,期内核参数的调整都是不同的,概述:由于默认的linux内核参数考虑的是最通用场景,这明显不符合用于支持高并发访问的Web服务器的定义,所以需要修改Linux内核参数,让Nginx可以拥有更高的性能;注:本文以PDF持续更新,最新尼恩架构笔记、面试题的PDF文件,请从下面的链接获取:码云参考关键的Linux内核优化参数/etc/sysctl.conf修改/etc/sysctl.conf来更改内核参数修改好配置文件,执行sys

  8. springboot整合mybatis代码快速生成 - 2

    特别说明:本次项目整合基于idea进行的,如果使用Eclipse可能操作会略有不同,不过总的来说不影响。springboot整合之如何选择版本及项目搭建springboot整合之版本号统一管理 springboot整合mybatis-plus+durid数据库连接池springboot整合swaggerspringboot整合mybatis代码快速生成springboot整合之统一结果返回springboot整合之统一异常处理springboot整合之Validated参数校验 springboot整合之logback日志配置springboot整合pagehelper分页springboot

  9. Elasticsearch与spring data整合api变化 - 2

    记录spring-data-elasticsearch版本api变化https://blog.csdn.net/zlpzlpzyd/article/details/128255792springboot2.7.x对应 spring-data-elasticsearch 4.4.x排查问题之前先看一下上述链接中版本的对应关系 org.springframework.data.elasticsearch.core.ElasticsearchTemplate找不到该类,但是有一个类是client包下的 org.springframework.data.elasticsearch.client.elc

  10. javascript - 如何将 Waze 数据与我的网页整合? - 2

    Waze是一个提供实际交通信息的应用程序。还有一个实时mapWazelivemap-如您所见,标记在哪里。所以,我找到了一些网站:Checkthislink-egaraz我真的很好奇egaraz.cz是如何从Waze获取(解析)数据(标记)的,因为没有机会这样做。或者他们有什么交易?我不知道。我试图将所有提到的网页下载到我的计算机上,我阅读了所有javascript文件和所有内容,但一无所获。可能我忽略了一些……你能帮我(试着)告诉我他们是怎么做到的吗?或者建议一些可行的解决方案.. 最佳答案 我对此做了一些调查,发现了一些奇怪的东

随机推荐