草庐IT

RocketMQ 学习

Donleo 2023-03-30 原文

前言

  RocketMQ是阿里巴巴旗下一款开源的MQ框架,经历过双十一考验、Java编程语言实现,有非常好完整生态系统。RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等

本篇文章第一部分属于一些核心概念和工作流程的讲解;第二部分就是纯手动搭建了一套环境;第三部分是基于环境进行测试和集成到SpringBoot

核心概念

  • NameServer:可以理解为是一个注册中心,主要是用来保存topic路由信息,管理Broker。在NameServer的集群中,NameServer与NameServer之间是没有任何通信的。

  • Broker:核心的一个角色,主要是用来保存topic的信息,接受生产者产生的消息,持久化消息。在一个Broker集群中,相同的BrokerName可以称为一个Broker组,一个Broker组中,BrokerId为0的为主节点,其它的为从节点。BrokerName和BrokerId是可以在Broker启动时通过配置文件配置的。每个Broker组只存放一部分消息。

  • 生产者:生产消息的一方就是生产者

  • 生产者组:一个生产者组可以有很多生产者,只需要在创建生产者的时候指定生产者组,那么这个生产者就在那个生产者组

  • 消费者:用来消费生产者消息的一方

  • 消费者组:跟生产者一样,每个消费者都有所在的消费者组,一个消费者组可以有很多的消费者,不同的消费者组消费消息是互不影响的。

  • topic(主题):可以理解为一个消息的集合的名字,生产者在发送消息的时候需要指定发到哪个topic下,消费者消费消息的时候也需要知道自己消费的是哪些topic底下的消息。

  • Tag(子主题):比topic低一级,可以用来区分同一topic下的不同业务类型的消息,发送消息的时候也需要指定。

这里有组的概念是因为可以用来做到不同的生产者组或者消费者组有不同的配置,这样就可以使得生产者或者消费者更加灵活。

工作流程

通过这张图就可以很清楚的知道,RocketMQ大致的工作流程:

  • Broker启动的时候,会往每台NameServer(因为NameServer之间不通信,所以每台都得注册)注册自己的信息,这些信息包括自己的ip和端口号,自己这台Broker有哪些topic等信息。

  • Producer在启动之后会跟会NameServer建立连接,定期从NameServer中获取Broker的信息,当发送消息的时候,会根据消息需要发送到哪个topic去找对应的Broker地址,如果有的话,就向这台Broker发送请求;没有找到的话,就看根据是否允许自动创建topic来决定是否发送消息。

  • Broker在接收到Producer的消息之后,会将消息存起来,持久化,如果有从节点的话,也会主动同步给从节点,实现数据的备份

  • Consumer启动之后也会跟NameServer建立连接,定期从NameServer中获取Broker和对应topic的信息,然后根据自己需要订阅的topic信息找到对应的Broker的地址,然后跟Broker建立连接,获取消息,进行消费

就跟上面的图一样,整体的工作流程还是比较简单的,这里简化了很多概念,主要是为了好理解。

环境搭建

  通过上面分析,我们知道,在RocketMQ中有NameServer、Broker、生产者、消费者四种角色。而生产者和消费者实际上就是业务系统,所以这里不需要搭建,真正要搭建的就是NameServer和Broker,但是为了方便RocketMQ数据的可视化,这里多搭建一套可视化的服务。

搭建过程比较简单,按照步骤一步一步来就可以完成,如果提示一些命令不存在,那么直接通过yum安装这些命令就行。

一、准备

需要准备一个linux服务器,需要先安装好JDK

关闭防火墙

systemctl stop firewalld
systemctl disable firewalld

下载并解压RocketMQ

1、创建一个目录,用来存放rocketmq相关的东西
mkdir /usr/rocketmq
cd /usr/rocketmq
2、下载并解压rocketmq

下载

wget https://archive.apache.org/dist/rocketmq/4.7.1/rocketmq-all-4.7.1-bin-release.zip

解压

unzip rocketmq-all-4.7.1-bin-release.zip

如果提示unzip: Command Not Found

通过yum命令安装,如果已经安装了,请忽略

yum install -y unzip zip

看到这一个文件夹就完成了

然后进入rocketmq-all-4.7.1-bin-release文件夹

cd rocketmq-all-4.7.1-bin-release

RocketMQ的东西都在这了

二、搭建NameServer

在启动NameServer之前,强烈建议修改一下启动时的jvm参数,因为默认的参数都比较大,为了避免内存不够,建议修改小,当然,如果你的内存足够大,可以忽略。

vi bin/runserver.sh

修改画圈的这一行

 可以设置小一点

-server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=32m -XX:MaxMetaspaceSize=50m

启动NameServer

修改完之后,执行如下命令就可以启动NameServer了

nohup sh bin/mqnamesrv &

查看NameServer日志

tail -f ~/logs/rocketmqlogs/namesrv.log

如果看到如下的日志,就说明启动成功了

 

关闭NameServer

sh /bin mqshutdown namesrv

三、搭建Broker

这里启动单机版的Broker

修改jvm参数

跟启动NameServer一样,也建议去修改jvm参数

vi bin/runbroker.sh

将画圈的地方设置小点,当然也别太小啊

 

 可以这样设置

-server -Xms1g -Xmx1g -Xmn512m

修改Broker配置文件broker.conf

这里需要改一下Broker配置文件,需要指定NameServer的地址,因为需要Broker需要往NameServer注册

vi conf/broker.conf

Broker配置文件

这里就能看出Broker的配置了,什么Broker集群的名称啊,Broker的名称啊,Broker的id啊,都跟前面说的对上了。

在文件末尾追加地址

namesrvAddr = localhost:9876

因为NameServer跟Broker在同一台机器,所以是localhost,NameServer端口默认的是9876。

不过这里我还建议再修改一处信息,因为Broker向NameServer进行注册的时候,带过去的ip如果不指定就会自动获取,但是自动获取的有个坑,就是有可能你的电脑无法访问到这个自动获取的ip,所以我建议手动指定你的电脑可以访问到的服务器ip。

我的虚拟机的ip是192.168.3.158,所以就指定为192.168.3.158,如下

brokerIP1 = 192.168.3.158
brokerIP2 = 192.168.3.158

开启自动创建Topic

autoCreateTopicEnable = true

如果以上都配置的话,最终的配置文件应该如下,红圈的为新加的

启动Broker

nohup sh bin/mqbroker -c conf/broker.conf &

-c 参数就是指定配置文件

查看日志

tail -f ~/logs/rocketmqlogs/broker.log

当看到如下日志就说明启动成功了

关闭Broker

sh /bin mqshutdown broker

查看Broker 与NameServer是否运行

jps

 说明Broker与NameServer是运行状态

四、搭建可视化控制台

其实前面NameServer和Broker搭建完成之后,就可以用来收发消息了,但是为了更加直观,可以搭一套可视化的服务。

可视化服务其实就是一个jar包,启动就行了。

jar包可以从这获取

链接:https://pan.baidu.com/s/16s1qwY2qzE2bxR81t5Wm6w
提取码:s0sd

将jar包上传到服务器,放到/usr/rocketmq的目录底下,当然放哪都无所谓,这里只是为了方便,因为rocketmq的东西都在这里

然后进入/usr/rocketmq下,执行如下命名

nohup java -jar -server -Xms256m -Xmx256m -Drocketmq.config.namesrvAddr=localhost:9876 -Dserver.port=8088 rocketmq-console-ng-1.0.1.jar &

rocketmq.config.namesrvAddr就是用来指定NameServer的地址的

查看日志

tail -f ~/logs/consolelogs/rocketmq-console.log

当看到如下日志,就说明启动成功了

然后在浏览器中输入http://linux服务器的ip:8088/就可以看到控制台了,如果无法访问,可以看看防火墙有没有关闭

 

通过控制台可以查看生产者、消费者、Broker集群等信息,非常直观。

功能很多,这里就不一一介绍了。

测试

环境搭好之后,就可以进行测试了。

引入依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.7.1</version>
</dependency>

生产者发送消息

    @Test
    public void sendTest() throws Exception{
        //创建一个生产者,指定生产者组为ldProducer
        DefaultMQProducer producer = new DefaultMQProducer("ldProducer");

        // 指定NameServer的地址
        producer.setNamesrvAddr("192.168.3.158:9876");
        // 第一次发送可能会超时,我设置的比较大
        producer.setSendMsgTimeout(60000);

        // 启动生产者
        producer.start();

        // 创建一条消息
        // topic为 ldTopic
        // 消息内容为 java学习日记
        // tags 为 TagA
        Message msg = new Message("ldTopic", "TagA", "java学习日记 ".getBytes(RemotingHelper.DEFAULT_CHARSET));

        // 发送消息并得到消息的发送结果,然后打印
        SendResult sendResult = producer.send(msg);
        System.out.printf("%s%n", sendResult);

        // 关闭生产者
        producer.shutdown();
    }
  • 构建一个消息生产者DefaultMQProducer实例,然后指定生产者组为ldProducer;
  • 指定NameServer的地址:服务器的ip:9876,因为需要从NameServer拉取Broker的信息
  • producer.start() 启动生产者
  • 构建一个内容为三友的java日记的消息,然后指定这个消息往ldTopic这个topic发送
  • producer.send(msg):发送消息,打印结果
  • 关闭生产者

消费者消费消息

public class ConsumerMsg {
    public static void main(String[] args) throws Exception {
        // 通过push模式消费消息,指定消费者组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ldConsumer");
        consumer.setNamesrvAddr("192.168.3.158:9876");
        // 订阅这个topic下的所有的消息
        consumer.subscribe("ldTopic", "*");
        // 注册一个消费的监听器,当有消息的时候,会回调这个监听器来消费消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("消费消息:%s", new String(msg.getBody()) + "\n");
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}
  • 创建一个消费者实例对象,指定消费者组为ldConsumer
  • 指定NameServer的地址:服务器的ip:9876
  • 订阅 ldTopic 这个topic的所有信息
  • consumer.registerMessageListener ,这个很重要,是注册一个监听器,这个监听器是当有消息的时候就会回调这个监听器,处理消息,所以需要用户实现这个接口,然后处理消息。
  • 启动消费者

启动之后,消费者就会消费刚才生产者发送的消息,于是控制台就打印出如下信息

 

再去看控制台,已消费

 

SpringBoot环境下集成RocketMQ

集成

在实际项目中肯定不会像上面测试那样用,都是集成SpringBoot的。

1、引入依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.1.1</version>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <version>2.1.1.RELEASE</version>
</dependency>

2、yml配置

rocketmq:
  producer:
    group: ldProducer
  name-server: 192.168.3.158:9876

3、创建消费者

SpringBoot底下只需要实现RocketMQListener接口,然后加上@RocketMQMessageListener注解即可

@Component
@RocketMQMessageListener(consumerGroup = "ldConsumer", topic = "ldDelayTaskTopic")
@Slf4j
public class LdRocketMQListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String msg) {
        log.info("获取到延迟任务消息:{}",msg);
    }
}

@RocketMQMessageListener需要指定消费者属于哪个消费者组,消费哪个topic,NameServer的地址已经通过yml配置文件配置类

4、测试

@RestController
@Slf4j
public class RocketMQDelayTaskController {

    @Resource
    private DefaultMQProducer producer;

    @GetMapping("/rocketmq/add")
    public void addTask(@RequestParam("task") String task) throws Exception {
        Message msg = new Message("ldDelayTaskTopic", "TagA", task.getBytes(RemotingHelper.DEFAULT_CHARSET));
        msg.setDelayTimeLevel(2);
        // 发送消息并得到消息的发送结果,然后打印
        log.info("提交延迟任务");
        producer.send(msg);
    }

}

 

可能遇到的问题

搭完mq单主单从集群之后,美滋滋想发一下message, 没想到碰到一个坑爹的问题:

Caused by: org.apache.rocketmq.client.exception.MQBrokerException: CODE: 14  DESC: service not available now, maybe disk full, CL:  0.90 CQ:  0.90 INDEX:  0.90, maybe your broker machine memory too small.
org.apache.rocketmq.client.exception.MQClientException: Send [3] times, still failed, cost [549]ms, Topic: ldTopicA, BrokersSent: [broker-a, broker-a, broker-a]
See http://rocketmq.apache.org/docs/faq/ for further details.

    at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:665)
    at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1343)
    at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1289)
    at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:325)
    at com.example.delay.MQTest.sendTest(MQTest.java:46)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
... Caused by: org.apache.rocketmq.client.exception.MQBrokerException: CODE: 14 DESC: service not available now, maybe disk full, CL: 0.90 CQ: 0.90 INDEX: 0.90, maybe your broker machine memory too small. For more information, please visit the url, http://rocketmq.apache.org/docs/faq/ at org.apache.rocketmq.client.impl.MQClientAPIImpl.processSendResponse(MQClientAPIImpl.java:665) at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:505) at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:487) at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:431) at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:854) at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:584)

看报错应该是磁盘空间不足的问题,看到一个帖子https://bbs.csdn.net/topics/392568834,还挺符合的,虽然给出的解决方案说的没那么详细,但是值得一试。

查看磁盘空间

 

已用91%,查阅百度之后发现rocketmq源码的DefaultMessageStore类里,默认会把剩余磁盘的比率不足75%(rocketmq版本不同这个比率好像不一样)当做磁盘空间不足处理,看来磁盘是有点不够了。

先cd到rocketmq配置文件的路径,我这里配置的是双主双从同步的模式,所以cd到配置文件(根据配置的不同文件夹的路径不一样,但都在/conf下)。

  1. cd rocketmq-all-4.7.1-bin-release/conf/2m-2s-sync/
  2. vim broker-a.properties
  3. 在最后加一行diskMaxUsedSpaceRatio=99(所有节点的配置文件都加一下),表示剩余磁盘比例不足99才报错
  4. :wq 保存退出

  5. 重启mq

 重新发送消息Ok了

 

 

有关RocketMQ 学习的更多相关文章

  1. LC滤波器设计学习笔记(一)滤波电路入门 - 2

    目录前言滤波电路科普主要分类实际情况单位的概念常用评价参数函数型滤波器简单分析滤波电路构成低通滤波器RC低通滤波器RL低通滤波器高通滤波器RC高通滤波器RL高通滤波器部分摘自《LC滤波器设计与制作》,侵权删。前言最近需要学习放大电路和滤波电路,但是由于只在之前做音乐频谱分析仪的时候简单了解过一点点运放,所以也是相当从零开始学习了。滤波电路科普主要分类滤波器:主要是从不同频率的成分中提取出特定频率的信号。有源滤波器:由RC元件与运算放大器组成的滤波器。可滤除某一次或多次谐波,最普通易于采用的无源滤波器结构是将电感与电容串联,可对主要次谐波(3、5、7)构成低阻抗旁路。无源滤波器:无源滤波器,又称

  2. CAN协议的学习与理解 - 2

    最近在学习CAN,记录一下,也供大家参考交流。推荐几个我觉得很好的CAN学习,本文也是在看了他们的好文之后做的笔记首先是瑞萨的CAN入门,真的通透;秀!靠这篇我竟然2天理解了CAN协议!实战STM32F4CAN!原文链接:https://blog.csdn.net/XiaoXiaoPengBo/article/details/116206252CAN详解(小白教程)原文链接:https://blog.csdn.net/xwwwj/article/details/105372234一篇易懂的CAN通讯协议指南1一篇易懂的CAN通讯协议指南1-知乎(zhihu.com)视频推荐CAN总线个人知识总

  3. 深度学习部署:Windows安装pycocotools报错解决方法 - 2

    深度学习部署:Windows安装pycocotools报错解决方法1.pycocotools库的简介2.pycocotools安装的坑3.解决办法更多Ai资讯:公主号AiCharm本系列是作者在跑一些深度学习实例时,遇到的各种各样的问题及解决办法,希望能够帮助到大家。ERROR:Commanderroredoutwithexitstatus1:'D:\Anaconda3\python.exe'-u-c'importsys,setuptools,tokenize;sys.argv[0]='"'"'C:\\Users\\46653\\AppData\\Local\\Temp\\pip-instal

  4. ruby - 我正在学习编程并选择了 Ruby。我应该升级到 Ruby 1.9 吗? - 2

    我完全不是程序员,正在学习使用Ruby和Rails框架进行编程。我目前正在使用Ruby1.8.7和Rails3.0.3,但我想知道我是否应该升级到Ruby1.9,因为我真的没有任何升级的“遗留”成本。缺点是什么?我是否会遇到与普通gem的兼容性问题,或者甚至其他我不太了解甚至无法预料的问题? 最佳答案 你应该升级。不要坚持从1.8.7开始。如果您发现不支持1.9.2的gem,请避免使用它们(因为它们很可能不被维护)。如果您对gem是否兼容1.9.2有任何疑问,您可以在以下位置查看:http://www.railsplugins.or

  5. ruby - 我如何学习 ruby​​ 的正则表达式? - 2

    如何学习ruby​​的正则表达式?(对于假人) 最佳答案 http://www.rubular.com/在Ruby中使用正则表达式时是一个很棒的工具,因为它可以立即将结果可视化。 关于ruby-我如何学习ruby​​的正则表达式?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/1881231/

  6. 深度学习12. CNN经典网络 VGG16 - 2

    深度学习12.CNN经典网络VGG16一、简介1.VGG来源2.VGG分类3.不同模型的参数数量4.3x3卷积核的好处5.关于学习率调度6.批归一化二、VGG16层分析1.层划分2.参数展开过程图解3.参数传递示例4.VGG16各层参数数量三、代码分析1.VGG16模型定义2.训练3.测试一、简介1.VGG来源VGG(VisualGeometryGroup)是一个视觉几何组在2014年提出的深度卷积神经网络架构。VGG在2014年ImageNet图像分类竞赛亚军,定位竞赛冠军;VGG网络采用连续的小卷积核(3x3)和池化层构建深度神经网络,网络深度可以达到16层或19层,其中VGG16和VGG

  7. 机器学习——时间序列ARIMA模型(四):自相关函数ACF和偏自相关函数PACF用于判断ARIMA模型中p、q参数取值 - 2

    文章目录1、自相关函数ACF2、偏自相关函数PACF3、ARIMA(p,d,q)的阶数判断4、代码实现1、引入所需依赖2、数据读取与处理3、一阶差分与绘图4、ACF5、PACF1、自相关函数ACF自相关函数反映了同一序列在不同时序的取值之间的相关性。公式:ACF(k)=ρk=Cov(yt,yt−k)Var(yt)ACF(k)=\rho_{k}=\frac{Cov(y_{t},y_{t-k})}{Var(y_{t})}ACF(k)=ρk​=Var(yt​)Cov(yt​,yt−k​)​其中分子用于求协方差矩阵,分母用于计算样本方差。求出的ACF值为[-1,1]。但对于一个平稳的AR模型,求出其滞

  8. Unity Shader 学习笔记(5)Shader变体、Shader属性定义技巧、自定义材质面板 - 2

    写在之前Shader变体、Shader属性定义技巧、自定义材质面板,这三个知识点任何一个单拿出来都是一套知识体系,不能一概而论,本文章目的在于将学习和实际工作中遇见的问题进行总结,类似于网络笔记之用,方便后续回顾查看,如有以偏概全、不祥不尽之处,还望海涵。1、Shader变体先看一段代码......Properties{ [KeywordEnum(on,off)]USL_USE_COL("IsUseColorMixTex?",int)=0 [Toggle(IS_RED_ON)]_IsRed("IsRed?",int)=0}......//中间省略,后续会有完整代码 #pragmamulti_c

  9. ruby-on-rails - 这个 C 和 PHP 程序员如何学习 Ruby 和 Rails? - 2

    按照目前的情况,这个问题不适合我们的问答形式。我们希望答案得到事实、引用或专业知识的支持,但这个问题可能会引发辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visitthehelpcenter指导。关闭9年前。我来自C、php和bash背景,很容易学习,因为它们都有相同的C结构,我可以将其与我已经知道的联系起来。然后2年前我学了Python并且学得很好,Python对我来说比Ruby更容易学。然后从去年开始,我一直在尝试学习Ruby,然后是Rails,我承认,直到现在我还是学不会,讽刺的是那些打着简单易学的烙印,但是对于我这样一个老练的程序员来说,我只是无法将它

  10. jquery - 使用 Rails 3 学习 Ajax 的资源 - 2

    按照目前的情况,这个问题不适合我们的问答形式。我们希望答案得到事实、引用或专业知识的支持,但这个问题可能会引发辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visitthehelpcenter指导。关闭10年前。有没有学习Ajax(jQuery)和Rails3的好资源?

随机推荐