草庐IT

RabbitMQ学习笔记02:Hello World!

阿龙弟弟 2023-03-28 原文

参考资料:RabbitMQ tutorial - "Hello world!" — RabbitMQ 

 

 

前言

RabbitMQ是一个中间人,它接受和转发消息。我们可以把它想象成一个邮局:当你把邮件投入邮箱的时候,你可以确信它最终会被投递到收件人的手中。RabbitMQ就是那个邮箱、邮局和邮差。区别就在于RabbitMQ投递的是二进制的消息数据。

这里有一些术语需要说明。

发送、产生消息的程序我们称之为生产者producer,使用此图标表示

队列queue就是一个有名字的邮箱,虽然消息会在RabbitMQ和你的应用程序之间流动,但是它们可以持久保存在队列中。队列会收到主机的内存和磁盘容量的限制。多个生产者可以向一个队列发送消息,同时多个消费者consumer也可以从一个队列中消费/接收消息。

队列的图标如下

接收、消费消息的程序我们称之为消费者consumer,使用此图标表示

注意,生产者、消费者以及RabbitMQ不需要位于同一台服务器上。事实上,绝大多数情况中,它们都分别位于不同的服务器上。生产者和消费者可以是同一个程序实现。

这里我们的实验环境,生产者和消费者是独立的程序文件,但是它们以及RabbitMQ都位于同一台服务器上。

 

 

Hello World!

学习RabbitMQ,其中的生产者和消费者需要用户自己使用编程语言来实现。我本人是一名运维工程师,有过bash, awk的经验,2017年学习廖雪峰的Python学了一半,近期又学习了B站高淇老师的Java前三章(包含面向对象),对于Python和Java语言算是一知半解,大概看得懂又写不出来的水平。我自己尝试了下,即使没有编程基础,只要我们严格按照官方的教程指导,也是可以将这些代码实现的。

这部分我们使用python手写生产者和消费者,使用RabbitMQ自带的guest账户。生产者会发送一条简单的消息给到名为hello的队列,消费者从队列会收到这条消息并将其打印出来。

简单的流程图如图所示,队列就有点类似于代表了消费者去接收了这条消息。

我们的操作系统自带了python 3.9,无论是键入python又或者是python3,都是指向python3,这点可以从字符链接里面看出来。

[root@rabbitmq-01 ~]# ls -l $(which python)
lrwxrwxrwx. 1 root root 9 Dec 12 20:51 /usr/bin/python -> ./python3
[root@rabbitmq-01 ~]# ls -l $(which python3)
lrwxrwxrwx. 1 root root 9 Dec 12 20:42 /usr/bin/python3 -> python3.9

我们需要使用到 Pika 库来使得我们的python代码可以连接RabbitMQ,这个库也是官方推荐的。

python -m pip install pika --upgrade

如果没有安装pip的话,可以使用

python -m ensurepip --upgrade

接下来就可以开始正式写python代码了。

Sending

我们的第一个程序send.py会发送一条消息到队列hello中。首先我们需要建立连接。

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

连接建立完毕,在发送消息之前,我们需要确保队列hello必须存在,否则我们发送出去的消息就会被丢弃。因此我们创建队列。

channel.queue_declare(queue='hello')

此时我们可以开始发送消息了,我们计划发送消息Hello World!到队列hello中去。

在mq中,消息无法直接发送到队列中去,必须要经过exchange。目前我们还不需要对其进行展开理解,大家直到这么个概念即可,后续在讲解RabbitMQ tutorial - Publish/Subscribe — RabbitMQ的时候会涉及到。

我们现在只需要将其发送给默认的exchange,使用空字符串来识别它。它是一个特殊的exchange,允许我们指定要发送消息的队列,队列使用routing_key参数来指定。

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print(" [x] Sent 'Hello World!'")

这里的print只是用来告诉用户我们做了什么。

消息发送完毕了,在我们退出程序前,我们应该确保网络buffer已经被清空,我们的消息真的发出去了。这里通过优雅地关闭连接来实现。

connection.close()

发送到这里就结束了,如果消息没有发送成功的话,可以考虑从日志入手排查问题。

Receiving

第二个程序是消费者receive.py,它将会从队列中接收消息并将其打印出来。

首先我们需要连接到服务器上,这部分代码和生产者代码相同。

接下来,同样我们需要确保队列的存在性。使用queue_declare创建队列是幂等(idempotent)的,即使执行多次也只会创建一个队列。

channel.queue_declare(queue='hello')

之所以在这里重复创建队列,是为了

  • 我们可能不知道生产者和消费者程序,哪个会先运行,因此最好在两端都创建队列。
  • 创建队列的操作需要具备幂等性。

从队列中接收一个消息会更加复杂,它工作方式是订阅一个callback函数到队列上。每当队列中有消息出现的时候,callback函数都会被Pika库调用。在本案例中,callback函数会打印消息在屏幕上。

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

接下来我们告诉mq,这个特定的callback函数需要从队列hello接收消息。这一步需要确保队列已存在,好在我们上面的代码已经确保过了。

channel.basic_consume(queue='hello',
                      auto_ack=True,
                      on_message_callback=callback)

auto_ack参数将会在RabbitMQ tutorial - Work Queues — RabbitMQ中讲解。

接下来我们进入无限循环,在循环中我们等待队列中一旦出现新的消息,我们就会将其消费掉然后输出消息的内容。

直到用户输入Ctrl+C来停止程序。

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

Putting it all together

send.py

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

receive.py

#!/usr/bin/env python
import pika, sys, os

def main():
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    channel.queue_declare(queue='hello')

    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body.decode())

    channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

把python代码文件放到服务器上,准备开始测试。

首先打开第一个终端,运行消费者程序receive.py,它会占用前端一直运行下去,一旦有消息就会打印,直到来自用户的终止指令Ctrl+C

[root@rabbitmq-01 code]# python receive.py 
 [*] Waiting for messages. To exit press CTRL+C

打开第二个终端,运行生产者程序send.py,每运行一次它都会向队列hello发送一条消息Hello World!同时输出到终端,随后它就会退出,不会占用前端资源。

[root@rabbitmq-01 code]# python send.py 
 [x] Sent 'Hello World!'

此时我们回到消费者终端,它会按照程序中说的消费掉队列中的消息并将其输出。

[root@rabbitmq-01 code]# python receive.py 
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Hello World!'

send.py可以反复执行。

[root@rabbitmq-01 code]# python send.py 
 [x] Sent 'Hello World!'
[root@rabbitmq-01 code]# python send.py 
 [x] Sent 'Hello World!'
[root@rabbitmq-01 code]# python send.py 
 [x] Sent 'Hello World!'
[root@rabbitmq-01 code]# python send.py 
 [x] Sent 'Hello World!'

消费者程序都可以捕获。

[root@rabbitmq-01 code]# python receive.py 
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Hello World!'
 [x] Received 'Hello World!'
 [x] Received 'Hello World!'
 [x] Received 'Hello World!'

我们可以通过rabbitmqctl list_queues来查看mq实例中当前存在的队列以及队列中的消息数量。

[root@rabbitmq-01 rabbitmq_server-3.11.5]# ./sbin/rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name	messages
hello	0

因为消息在一瞬间就被消费掉了,所以我们看到的消息数量都会是0。

想要退出的话就是在消费中终端执行Ctrl+C

 [x] Received 'Hello World!'
 [x] Received 'Hello World!'
^CInterrupted

 

 

总结

这篇博文讲述了基本的消息队列的工作模型。我们学会了RabbitMQ的基本概念(生产者、消费者和队列),并了解如何使用代码来实现。

有关RabbitMQ学习笔记02:Hello World!的更多相关文章

  1. postman——集合——执行集合——测试脚本——pm对象简单示例02 - 2

    //1.验证返回状态码是否是200pm.test("Statuscodeis200",function(){pm.response.to.have.status(200);});//2.验证返回body内是否含有某个值pm.test("Bodymatchesstring",function(){pm.expect(pm.response.text()).to.include("string_you_want_to_search");});//3.验证某个返回值是否是100pm.test("Yourtestname",function(){varjsonData=pm.response.json

  2. LC滤波器设计学习笔记(一)滤波电路入门 - 2

    目录前言滤波电路科普主要分类实际情况单位的概念常用评价参数函数型滤波器简单分析滤波电路构成低通滤波器RC低通滤波器RL低通滤波器高通滤波器RC高通滤波器RL高通滤波器部分摘自《LC滤波器设计与制作》,侵权删。前言最近需要学习放大电路和滤波电路,但是由于只在之前做音乐频谱分析仪的时候简单了解过一点点运放,所以也是相当从零开始学习了。滤波电路科普主要分类滤波器:主要是从不同频率的成分中提取出特定频率的信号。有源滤波器:由RC元件与运算放大器组成的滤波器。可滤除某一次或多次谐波,最普通易于采用的无源滤波器结构是将电感与电容串联,可对主要次谐波(3、5、7)构成低阻抗旁路。无源滤波器:无源滤波器,又称

  3. CAN协议的学习与理解 - 2

    最近在学习CAN,记录一下,也供大家参考交流。推荐几个我觉得很好的CAN学习,本文也是在看了他们的好文之后做的笔记首先是瑞萨的CAN入门,真的通透;秀!靠这篇我竟然2天理解了CAN协议!实战STM32F4CAN!原文链接:https://blog.csdn.net/XiaoXiaoPengBo/article/details/116206252CAN详解(小白教程)原文链接:https://blog.csdn.net/xwwwj/article/details/105372234一篇易懂的CAN通讯协议指南1一篇易懂的CAN通讯协议指南1-知乎(zhihu.com)视频推荐CAN总线个人知识总

  4. 深度学习部署:Windows安装pycocotools报错解决方法 - 2

    深度学习部署:Windows安装pycocotools报错解决方法1.pycocotools库的简介2.pycocotools安装的坑3.解决办法更多Ai资讯:公主号AiCharm本系列是作者在跑一些深度学习实例时,遇到的各种各样的问题及解决办法,希望能够帮助到大家。ERROR:Commanderroredoutwithexitstatus1:'D:\Anaconda3\python.exe'-u-c'importsys,setuptools,tokenize;sys.argv[0]='"'"'C:\\Users\\46653\\AppData\\Local\\Temp\\pip-instal

  5. 牛客网专项练习30天Pytnon篇第02天 - 2

    1.在Python3中,下列关于数学运算结果正确的是:(B)a=10b=3print(a//b)print(a%b)print(a/b)A.3,3,3.3333...B.3,1,3.3333...C.3.3333...,3.3333...,3D.3.3333...,1,3.3333...解析:    在Python中,//表示地板除(向下取整),%表示取余,/表示除(Python2向下取整返回3)2.如下程序Python2会打印多少个数:(D)k=1000whilek>1:    print(k)k=k/2A.1000 B.10C.11D.9解析:    按照题意每次循环K/2,直到K值小于等

  6. ruby - 我正在学习编程并选择了 Ruby。我应该升级到 Ruby 1.9 吗? - 2

    我完全不是程序员,正在学习使用Ruby和Rails框架进行编程。我目前正在使用Ruby1.8.7和Rails3.0.3,但我想知道我是否应该升级到Ruby1.9,因为我真的没有任何升级的“遗留”成本。缺点是什么?我是否会遇到与普通gem的兼容性问题,或者甚至其他我不太了解甚至无法预料的问题? 最佳答案 你应该升级。不要坚持从1.8.7开始。如果您发现不支持1.9.2的gem,请避免使用它们(因为它们很可能不被维护)。如果您对gem是否兼容1.9.2有任何疑问,您可以在以下位置查看:http://www.railsplugins.or

  7. ruby - 我如何学习 ruby​​ 的正则表达式? - 2

    如何学习ruby​​的正则表达式?(对于假人) 最佳答案 http://www.rubular.com/在Ruby中使用正则表达式时是一个很棒的工具,因为它可以立即将结果可视化。 关于ruby-我如何学习ruby​​的正则表达式?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/1881231/

  8. 深度学习12. CNN经典网络 VGG16 - 2

    深度学习12.CNN经典网络VGG16一、简介1.VGG来源2.VGG分类3.不同模型的参数数量4.3x3卷积核的好处5.关于学习率调度6.批归一化二、VGG16层分析1.层划分2.参数展开过程图解3.参数传递示例4.VGG16各层参数数量三、代码分析1.VGG16模型定义2.训练3.测试一、简介1.VGG来源VGG(VisualGeometryGroup)是一个视觉几何组在2014年提出的深度卷积神经网络架构。VGG在2014年ImageNet图像分类竞赛亚军,定位竞赛冠军;VGG网络采用连续的小卷积核(3x3)和池化层构建深度神经网络,网络深度可以达到16层或19层,其中VGG16和VGG

  9. 机器学习——时间序列ARIMA模型(四):自相关函数ACF和偏自相关函数PACF用于判断ARIMA模型中p、q参数取值 - 2

    文章目录1、自相关函数ACF2、偏自相关函数PACF3、ARIMA(p,d,q)的阶数判断4、代码实现1、引入所需依赖2、数据读取与处理3、一阶差分与绘图4、ACF5、PACF1、自相关函数ACF自相关函数反映了同一序列在不同时序的取值之间的相关性。公式:ACF(k)=ρk=Cov(yt,yt−k)Var(yt)ACF(k)=\rho_{k}=\frac{Cov(y_{t},y_{t-k})}{Var(y_{t})}ACF(k)=ρk​=Var(yt​)Cov(yt​,yt−k​)​其中分子用于求协方差矩阵,分母用于计算样本方差。求出的ACF值为[-1,1]。但对于一个平稳的AR模型,求出其滞

  10. Unity Shader 学习笔记(5)Shader变体、Shader属性定义技巧、自定义材质面板 - 2

    写在之前Shader变体、Shader属性定义技巧、自定义材质面板,这三个知识点任何一个单拿出来都是一套知识体系,不能一概而论,本文章目的在于将学习和实际工作中遇见的问题进行总结,类似于网络笔记之用,方便后续回顾查看,如有以偏概全、不祥不尽之处,还望海涵。1、Shader变体先看一段代码......Properties{ [KeywordEnum(on,off)]USL_USE_COL("IsUseColorMixTex?",int)=0 [Toggle(IS_RED_ON)]_IsRed("IsRed?",int)=0}......//中间省略,后续会有完整代码 #pragmamulti_c

随机推荐