文章目录
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。

MQTT Broker可以非常简单地在Raspberry Pi或NAS等单板计算机上实现,当然也可以在大型机或 Internet 服务器上实现。服务器分发消息,因此必须是发布者,但绝不是订阅者!客户端可以发布消息(发送方)、订阅消息(接收方)或两者兼而有之。
客户端(也称为节点)是一种智能设备,如微控制器或具有 TCP/IP 堆栈和实现 MQTT 协议的软件的计算机。消息在允许过滤的主题下发布。主题是分层划分的 UTF-8 字符串。不同的主题级别用斜杠/作为分隔符号。
以下是一个简单的MQTT的应用场景,具体如下图所示:

MQTT协议的版本有MQTT v3.1.1/3.1,最新的是MQTT5.0(后续以5.0协议进行演示)。除标准版外,还有一个简化版MQTT-SN,该协议主要针对嵌入式设备,这些设备一般工作于TCP/IP网络,如:ZigBee。MQTT 与 HTTP 一样,MQTT 运行在传输控制协议/互联网协议 (TCP/IP) 堆栈之上。

为了满足不同的场景,MQTT支持三种不同级别的服务质量(Quality of Service,QoS)为不同场景提供消息可靠性:
服务质量是个老话题了。级别2所提供的不重不丢很多情况下是最理想的,不过往返多次的确认一定对并发和延迟带来影响。级别1提供的至少一次语义在日志处理这种场景下是完全OK的,所以像Kafka这类的系统利用这一特点减少确认从而大大提高了并发。级别0适合鸡肋数据场景,食之无味弃之可惜,就这么着吧。
整体MQTT的消息格式如下图所示;

固定头存在于所有MQTT数据包中,其结构如下:

MQTT消息类型 / message type
位置:byte 1, bits 7-4。
4位的无符号值,类型如下:
名称 值 流方向 描述
标识位 / DUP
位置:byte 1, bits 3-0。
在不使用标识位的消息类型中,标识位被作为保留位。如果收到无效的标志时,接收端必须关闭网络连接:
数据包 标识位 Bit 3 Bit 2 Bit 1 Bit 0
DUP:发布消息的副本。用来在保证消息的可靠传输,如果设置为 1,则在下面的变长中增加MessageId,并且需要回复确认,以保证消息传输完成,但不能用于检测消息重复发送。
QoS发布消息的服务质量(前面已经做过介绍),即:保证消息传递的次数
00:最多一次,即:<=1
01:至少一次,即:>=1
10:一次,即:=1
11:预留
RETAIN:发布保留标识,表示服务器要保留这次推送的信息,如果有新的订阅者出现,就把这消息推送给它,如果设有那么推送至当前订阅者后释放。
剩余长度(Remaining Length)
位置:byte 1。
固定头的第二字节用来保存变长头部和消息体的总大小的,但不是直接保存的。这一字节是可以扩展,其保存机制,前7位用于保存长度,后一部用做标识。当最后一位为 1时,表示长度不足,需要使用二个字节继续保存。例如:计算出后面的大小为0
MQTT数据包中包含一个可变头,它驻位于固定的头和负载之间。可变头的内容因数据包类型而不同,较常的应用是做为包的标识:
Bit 7 — 0
很多类型数据包中都包括一个2字节的数据包标识字段,这些类型的包有:
PUBLISH (QoS > 0)、PUBACK、PUBREC、PUBREL、PUBCOMP、
SUBSCRIBE、SUBACK、UNSUBSCRIBE、UNSUBACK
Payload消息体是MQTT数据包的第三部分,CONNECT、SUBSCRIBE、SUBACK、UNSUBSCRIBE四种类型的消息 有消息体:
目前MQTT代理的主流平台有下面几个:
Mosquitto:https://mosquitto.org/
VerneMQ:https://vernemq.com/
EMQTT:http://emqtt.io/
1、安装服务器端
sudo apt-get install mosquitto
完成安装后,服务器就搭建好了,系统会自动运行mosquitto,默认端口为1883。
2、安装客户端
前面服务器端搭建好了,但是客户端还没有安装。这一步是可选的,如果需要在终端上测试MQTT订阅/发布的通信就需要执行这一步,这里我们也安装上去才有后续的这些测试。
sudo apt install mosquitto-clients
3、启动mosquitto服务
mosquitto -v
-v 详细模式——启用所有日志记录类型。

4、通过service启动/关闭mosquitto服务
sudo service mosquitto start
sudo service mosquitto stop
5、查看运行状态
sudo systemctl status mosquitto


6、查看帮助信息
mosquitto --help

7、关闭mosquitto 服务
8、测试(默认配置)
首先打开三个终端,
1、启动代理服务:mosquitto -v
-v 详细模式 打印调试信息
2、订阅主题:mosquitto_sub -v -t hello
-t 指定订阅的主题,主题为:hello
-v 详细模式 打印调试信息
3、发布内容:mosquitto_pub -t hello -m world
-t 指定订阅的主题,主题为:hello
-m 指定发布的消息的内容
具体运行效果如下图:

采用命令行安装mosquitto简单方便,但是安装的版本往往是旧的版本,不能满足最新的协议要求(比如不支持MQTT5.0协议),因此就需要去官网(https://mosquitto.org/)下载源码,安装需要的新版本(2.0.15支持MQTT5.0),下面简单介绍安装流程。
1、安装mosquitto所需要依赖
sudo apt-get install libssl-dev
sudo apt-get install uuid-dev
sudo apt-get install cmake
2、源码下载
wget http://mosquitto.org/files/source/mosquitto-2.0.15.tar.gz
3、解压源码
tar -zxvf mosquitto-2.0.15.tar.gz
进入源码目录:
cd mosquitto-2.0.15/
4、编译与安装源码
make
sudo make install
上面使用的都是默认配置,如需修改服务器的配置信息需要修改:
mosquitto源码目录下的配置文件mosquitto.conf 或者**/etc/mosquitto/mosquitto.conf**文件
在启动服务器时使用命令:mosquitto -c mosquitto.conf -d(在mosquitto安装目录下)
通过源码下载编译安装,会生成一些供我们使用的测试程序和动态库,用来我们自己进行开发使用。
https://mqttx.app/zh
mqttx提供了命令行+app两种方式实现MQTT测试,下载app后具体教程可参考上述网站。


源代码及库安装使用教程如下:
https://github.com/eclipse/paho.mqtt.cpp
此存储库包含内存管理操作系统(如 Linux/Posix 和 Windows)上的 Eclipse Paho MQTT C++ 客户端库的源代码。
此代码构建了一个库,使 C++11 应用程序能够连接到 MQTT 代理、向代理发布消息、订阅主题和接收发布的消息。
mqtt_client.hpp
#ifndef __MQTT_CLIENT_TO_CLOUD_HPP__
#define __MQTT_CLIENT_TO_CLOUD_HPP__
#include <mqtt/async_client.h> // mqtt库头文件
#include <mqtt/topic.h> // mqtt库头文件
namespace cloud {
//! Handler on cloud message
using message_handler = std::function<void(const std::string&)>;
class mqtt_client
{
public:
mqtt_client();
~mqtt_client();
void send(const std::string& message);
void set_message_handler(message_handler cb);
private:
// static constexpr const char* BROKER_HOST = "localhost:1883"; //本地测试:mosquitto
static constexpr const char* BROKER_HOST = "broker.emqx.io:1883"; //公共mqtt broker:MQTTX
// static constexpr const char* BROKER_HOST = "124.XXX.XXX.XXX:1883"; //云端测试:mosquitto
private:
mqtt::async_client cli_;
mqtt::topic topic_;
};
} // namespace cloud
#endif
mqtt_client.cpp
#include <iostream>
#include <string>
#include "mqtt_client.hpp"
using namespace std;
namespace cloud {
// mqtt_client类构造函数实现
mqtt_client::mqtt_client()
// 1.The server URI string 2.The client ID string that we provided to the server 3.The MQTT protocol version we're connected at
: cli_(BROKER_HOST, "client", mqtt::create_options(MQTTVERSION_5))
// 1.The client to which this topic is connected 2.The topic name(pub & sub) 3.The default QoS
, topic_(cli_,"haojuhu", 1)
{
//! Handler on connection lost, do reconnect here
cli_.set_connection_lost_handler([this](const string& info) {
std::cout<<"mqtt connection lost <" << info << ">, reconnting"<<std::endl;
cli_.reconnect();
});
//! Handler on connected, it'll subscribe the topic and publish online info
cli_.set_connected_handler([this](const string& info) {
std::cout << "mqtt connected <" << info << ">"<<std::endl;
topic_.subscribe(mqtt::subscribe_options(true)); // client订阅topic[haojuhu]
topic_.publish("online"); // client发布消息"online"至topic[haojuhu]
});
}
//2.mqtt_client类析构函数实现
mqtt_client::~mqtt_client()
{
cli_.disconnect();
cli_.disable_callbacks();
}
/**
* @brief Publish message to topic,发布消息给topic
* @param[in] message The message payload
*/
void mqtt_client::send(const string& message)
{
topic_.publish(message);
}
/**
* @brief Set mqtt message handler,设置mqtt消息处理句柄(也就是函数对象cb)
* @note The mqtt connection will established here
* @param[in] cb The message handler
*/
void mqtt_client::set_message_handler(message_handler cb)
{
//! Set message callback here
cli_.set_message_callback([cb](mqtt::const_message_ptr message)
{
cb(message->get_payload_str()); //执行函数cb
});
//! Set connect options and do connect
auto opts = mqtt::connect_options_builder()
.mqtt_version(MQTTVERSION_5)
.clean_start(true)
.finalize();
cli_.connect(opts);
}
} // namespace cloud
main.cpp
#include <iostream>
#include <string>
#include <map>
#include "mqtt_client.hpp"
using namespace std;
/**
* @brief Handler on message from cloud
* @param[in] data The message payload from cloud
*/
void on_cloud_message(const string& data)
{
std::cout<<"received data is: "<<data<<std::endl;
}
int main(int argc,char **argv)
{
cloud::mqtt_client g_client; //定义一个mqtt客户端
std::cout << "[CLOUD] listen starting"<<std::endl;
g_client.set_message_handler(on_cloud_message); //开启mqtt clinet监听消息,消息处理函数为on_cloud_message
while(1)
{
std::cout << "运行中..."<<std::endl;
this_thread::sleep_for(10s);
g_client.send("online..."); // 确保与mqtt broker server建立连接之后再publish!!!
}
return 0;
}
g++ main.cpp mqtt_client.cpp -lpaho-mqttpp3 -lpaho-mqtt3a
./a.out

上面主要介绍了以下三个部分的内容:
我构建了两个需要相互通信和发送文件的Rails应用程序。例如,一个Rails应用程序会发送请求以查看其他应用程序数据库中的表。然后另一个应用程序将呈现该表的json并将其发回。我还希望一个应用程序将存储在其公共(public)目录中的文本文件发送到另一个应用程序的公共(public)目录。我从来没有做过这样的事情,所以我什至不知道从哪里开始。任何帮助,将不胜感激。谢谢! 最佳答案 无论Rails是什么,几乎所有Web应用程序都有您的要求,大多数现代Web应用程序都需要相互通信。但是有一个小小的理解需要你坚持下去,网站不应直接访问彼此
我有一个用户工厂。我希望默认情况下确认用户。但是鉴于unconfirmed特征,我不希望它们被确认。虽然我有一个基于实现细节而不是抽象的工作实现,但我想知道如何正确地做到这一点。factory:userdoafter(:create)do|user,evaluator|#unwantedimplementationdetailshereunlessFactoryGirl.factories[:user].defined_traits.map(&:name).include?(:unconfirmed)user.confirm!endendtrait:unconfirmeddoenden
导读:随着叮咚买菜业务的发展,不同的业务场景对数据分析提出了不同的需求,他们希望引入一款实时OLAP数据库,构建一个灵活的多维实时查询和分析的平台,统一数据的接入和查询方案,解决各业务线对数据高效实时查询和精细化运营的需求。经过调研选型,最终引入ApacheDoris作为最终的OLAP分析引擎,Doris作为核心的OLAP引擎支持复杂地分析操作、提供多维的数据视图,在叮咚买菜数十个业务场景中广泛应用。作者|叮咚买菜资深数据工程师韩青叮咚买菜创立于2017年5月,是一家专注美好食物的创业公司。叮咚买菜专注吃的事业,为满足更多人“想吃什么”而努力,通过美好食材的供应、美好滋味的开发以及美食品牌的孵
华为OD机试题本篇题目:明明的随机数题目输入描述输出描述:示例1输入输出说明代码编写思路最近更新的博客华为od2023|什么是华为od,od薪资待遇,od机试题清单华为OD机试真题大全,用Python解华为机试题|机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为o
C#实现简易绘图工具一.引言实验目的:通过制作窗体应用程序(C#画图软件),熟悉基本的窗体设计过程以及控件设计,事件处理等,熟悉使用C#的winform窗体进行绘图的基本步骤,对于面向对象编程有更加深刻的体会.Tutorial任务设计一个具有基本功能的画图软件**·包括简单的新建文件,保存,重新绘图等功能**·实现一些基本图形的绘制,包括铅笔和基本形状等,学习橡皮工具的创建**·设计一个合理舒适的UI界面**注明:你可能需要先了解一些关于winform窗体应用程序绘图的基本知识,以及关于GDI+类和结构的知识二.实验环境Windows系统下的visualstudio2017C#窗体应用程序三.
MIMO技术的优缺点优点通过下面三个增益来总体概括:阵列增益。阵列增益是指由于接收机通过对接收信号的相干合并而活得的平均SNR的提高。在发射机不知道信道信息的情况下,MIMO系统可以获得的阵列增益与接收天线数成正比复用增益。在采用空间复用方案的MIMO系统中,可以获得复用增益,即信道容量成倍增加。信道容量的增加与min(Nt,Nr)成正比分集增益。在采用空间分集方案的MIMO系统中,可以获得分集增益,即可靠性性能的改善。分集增益用独立衰落支路数来描述,即分集指数。在使用了空时编码的MIMO系统中,由于接收天线或发射天线之间的间距较远,可认为它们各自的大尺度衰落是相互独立的,因此分布式MIMO
需求:要创建虚拟机,就需要给他提供一个虚拟的磁盘,我们就在/opt目录下创建一个10G大小的raw格式的虚拟磁盘CentOS-7-x86_64.raw命令格式:qemu-imgcreate-f磁盘格式磁盘名称磁盘大小qemu-imgcreate-f磁盘格式-o?1.创建磁盘qemu-imgcreate-fraw/opt/CentOS-7-x86_64.raw10G执行效果#ls/opt/CentOS-7-x86_64.raw2.安装虚拟机使用virt-install命令,基于我们提供的系统镜像和虚拟磁盘来创建一个虚拟机,另外在创建虚拟机之前,提前打开vnc客户端,在创建虚拟机的时候,通过vnc
遍历文件夹我们通常是使用递归进行操作,这种方式比较简单,也比较容易理解。本文为大家介绍另一种不使用递归的方式,由于没有使用递归,只用到了循环和集合,所以效率更高一些!一、使用递归遍历文件夹整体思路1、使用File封装初始目录,2、打印这个目录3、获取这个目录下所有的子文件和子目录的数组。4、遍历这个数组,取出每个File对象4-1、如果File是否是一个文件,打印4-2、否则就是一个目录,递归调用代码实现publicclassSearchFile{publicstaticvoidmain(String[]args){//初始目录Filedir=newFile("d:/Dev");Datebeg
通常,数组被实现为内存块,集合被实现为HashMap,有序集合被实现为跳跃列表。在Ruby中也是如此吗?我正在尝试从性能和内存占用方面评估Ruby中不同容器的使用情况 最佳答案 数组是Ruby核心库的一部分。每个Ruby实现都有自己的数组实现。Ruby语言规范只规定了Ruby数组的行为,并没有规定任何特定的实现策略。它甚至没有指定任何会强制或至少建议特定实现策略的性能约束。然而,大多数Rubyist对数组的性能特征有一些期望,这会迫使不符合它们的实现变得默默无闻,因为实际上没有人会使用它:插入、前置或追加以及删除元素的最坏情况步骤复
在ruby中,你可以这样做:classThingpublicdeff1puts"f1"endprivatedeff2puts"f2"endpublicdeff3puts"f3"endprivatedeff4puts"f4"endend现在f1和f3是公共(public)的,f2和f4是私有(private)的。内部发生了什么,允许您调用一个类方法,然后更改方法定义?我怎样才能实现相同的功能(表面上是创建我自己的java之类的注释)例如...classThingfundeff1puts"hey"endnotfundeff2puts"hey"endendfun和notfun将更改以下函数定