1、强烈建议查看官方文档,说的是真的不错,浅显易懂:EMQ 提供了通俗易懂的技术文章,帮助开发者快速了解 MQTT 协议及其相关特性
2、在我使用到现在,对比了很多MQTT broker,EMQX 几乎是完美的,只有一点限制,那就是开源版本不支持消息持久化
3、当然直接使用官方编译好的方式,启动也是不错的,很简单,也是很推荐的,官方文档的安装步骤一栏就有,记得选好版本欧
官方安装文档地址
点击上面箭头的链接,就会跳转了
1、这里安装使用docker安装,版本为4.4.9,目前为止版本已经到5.x,但是本着先不用最新版本的原则,就先使用4.x版本
官网文档:4.4版本中文官方文档,左下角有切换版本和语言的按钮,选择对应的版本就好
2、从 dockerHub中找到对应的镜像版本 emqx/emqx
3、我就使用4.4.9版本的镜像就🆗。

4、建立对应的数据挂载目录,至于为什么挂载这些目录,是因为这些目录比较重要,可以看官方的目录解释
mkdir -p /data/docker/emqx/{etc,lib,data,log}


5、先运行一个demo 的emqx,就是没有挂载的镜像版本,为了将对应的目录文件copy出来,因为等下如果挂载,会读取挂载目录的数据,所以我们又不知道要那些数据,所以先跑一个demo,把对应文件数据copy出来,后面用来挂载即可
对应端口说明,如下图,其中挂载一个时间是因为,我发现这个镜像的时区不对,比我们少8个小时,所以挂载下时区,就可以保证和系统是一致的了
docker run -d --name emqx -v /etc/localtime:/etc/localtime -p 1883:1883 -p 8081:8081 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:4.4.9


6、将对应的问价copy到我们前面建立的目录中,copy完成之后,就会发现我们的目录有数据了,这都是默认的基础配置,后面就可以用它们来挂载
docker cp emqx:/opt/emqx/etc /data/docker/emqx
docker cp emqx:/opt/emqx/lib /data/docker/emqx
docker cp emqx:/opt/emqx/data /data/docker/emqx
docker cp emqx:/opt/emqx/log /data/docker/emqx

7、修改目录权限
chown -R 1000:1000 /data/docker/emqx/
chmod -R 755 /data/docker/emqx/

8、删除不用的,建立完整的镜像
# 删除前面那个demo
docker stop emqx && docker rm emqx
# 建立新的
docker run -d --name emqx --restart=always \
-p 1883:1883 \
-p 8883:8883 \
-p 8083:8083 \
-p 8084:8084 \
-p 8081:8081 \
-p 18083:18083 \
-v /etc/localtime:/etc/localtime \
-v /data/docker/emqx/etc:/opt/emqx/etc \
-v /data/docker/emqx/lib:/opt/emqx/lib \
-v /data/docker/emqx/data:/opt/emqx/data \
-v /data/docker/emqx/log:/opt/emqx/log \
emqx/emqx:4.4.9

9、登录页面,根据前面的那个端口关系知道,18083是dashboard端口,访问即可
http://192.168.172.229:18083,输入默认账号: admin 默认密码:public

10、然后会强制你修改默认的密码,就🆗了,然后用新密码登录

1、这个参考文章 springboot整合mqtt实现消息发送和消费,以及客户端断线重连之后的消息恢复,就行写的很好
1、如上默认任何客户端都可以链接,这肯定是不可以的。
2、官方提供很多认证方式,这里选取较为方便的一种 mnesia认证

3、首先在配置文件中关闭匿名客户端链接,修改我们的挂载的配置文件(镜像里面的配置也会跟着改),然后重启镜像(不然不能生效…)
# 修改下述配置
vim /data/docker/emqx/etc/emqx.conf
# 重启镜像
docker restart emqx



4、再次链接你就会发现无权链接

5、先在控制台开启 mnesia认证


6、增加对应用户,这的话官方说的已经很清楚了 Mnesia 认证,两种方式
7、那我们通过修改配置文件的方式好了,改完之后重启
# 1、修改配置文件,增加用户
vim /data/docker/emqx/etc/plugins/emqx_auth_mnesia.conf
# etc/plugins/emqx_auth_mnesia.conf
## clientid 认证数据
auth.client.1.clientid = thermal
auth.client.1.password = TAUb&LjCdYB#EjAMNB
## username 认证数据
auth.user.2.username = thermal
auth.user.2.password = TAUb&LjCdYB#EjAMNB
# 2、重启
docker restart emqx


至于为啥有clientId和userName,两种,我建议是两个都配置,并且保持一致,当然你也可以只配置
userName的方式也行,我已经测试过了。
8、最后就可以了,使用最新的账号去链接。注意那个admin账号是登录控制台的,和客户端链接认证没有关系,客户端链接得写后面我们增加的用户 thermal


- S Y S / b r o k e r s / SYS/brokers/ SYS/brokers/{node}/clients/${clientid}/connected
- S Y S / b r o k e r s / SYS/brokers/ SYS/brokers/{node}/clients/${clientid}/disconnected
web_hook来实现 参考文章 MQTT EMQX中如何监听客户端上下线?,但是这个文章中说web_hook没有问题是不对的,比如,我们提供的web_hook接口的服务需要重启,而重启这段时间内,其他客户端上下线的数据,我们就拿不到了。


1、意思就是,消息已经发布到emqx服务器了,但是此时接收者不在线,而我们重启emqx服务后,再让接收者上线,前面那一部分数据,接收者是收不到的,也就是丢失了。 文章说明参考 基于emq x开源版实现服务重启后主题和消息恢复的完整方案(二),开源版本不持支数据持久化。

2、付费版本是提供了很多方式的数据持久化的,参考官方文章EMQX 插件持久化系列 (五)MySQL MQTT 数据存储
注意这里的消息持久化,是指普通消息重启之后也会在(即离线消息),官方有提供一个插件,
emqx_retainer,但是这个只是持久化保留消息的,每次重启都会发给订阅者 ,关于保留消息的概念,可以参考 官方文档 保留消息
大家要区分出来
离线消息,消息未送到订阅端,保存在emqx服务器中,但是emqx服务重启了和保留消息(只有保存最新的一次消息)的区别,下面这个配置是配置保留消息的持久化的,所以和持久化离线消息是没有的。

1、这篇文章说的很不错,参考链接 SpringBoot 开发之 MQTT 协议客户端断线后自动重连与保留断线时发布的主题消息,可以直接看这篇,下面的就是它的解决方案。
2、如果有一个客户端程序已经用emqx很久了,突然emqx服务需要重启,那么这个客户端程序就会抛出一个异常,说被迫断开了一个链接,即使emqx服务重启好了之后,这个客户端程序还是不可以使用,除非你重启这个客户端程序,再次重连,所以我们需要做的是,不重启客户都安程序,当emqx服务OK的时候,自动重连。
设置参数AutomaticReconnect 为true,但是 使用 automaticReconnect 为 true 表示断线自动重连,但仅仅只是重新连接,并不订阅主题;
所以还要在链接完成后,再次订阅主题
@Override
public void connectComplete(boolean b, String s) {
// 客户端连接成功
CodeUtils.info("[MQTT] 连接成功,重新订阅主题...");
try {
client.subscribe(topic, QOS);
} catch (MqttException e) {
e.printStackTrace();
}
}
上面代码等同于,我们自己在链接成功的时候去订阅主题,主题可以重复订阅
@Override
public void connectComplete(boolean reconnect, String serverURI) {
log.info("platform端 链接成功,是否是reconnect的结果: {},serverURI: {}", reconnect, serverURI);
// 以testtopic/#结尾表示订阅所有以testtopic开头的主题
// 手动订阅之后,messageArrived() 方法就收不到消息了,都在这里
mqttPlatformClient.subscribe(TopicConstant.UPPER_MACHINE_2_PLATFORM, 2, new IMqttMessageListener() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("platform端 订阅消息回调函数topic: " + topic + " ,msg: " + message.toString());
}
});
}
两种方式二选一即可。
1、我们假设emqx服务是🆗的,那么如果一个客户端服务,需要重启更新版本,那么,我希望更新版本之后,这个客户端还能收到掉线这段时间的数据,这个很简单,emqx服务已经实现了,需要设置以下两点
2、但是EMQX服务保存的数据量也是有配置的,这里说的是将离线消息保存在内存中,即EMQX服务没有重启(开源版本不支持离线消息持久化,前面说了),如下图,我们知道,离线保存消息在内存中(也就是飞行窗口和消息队列)只会保存5分钟,且最大也才1000条,修改参数,可以在全局参数文件emqx.conf中修改

3、修改时间参数和队列存储长度的参数,可以看 4.5小节,里面举例就是按照这个参数举例的。
# 队列长度,默认1000,设置为0不限制
zone.external.max_mqueue_len = 1000
# 会话默认超时时间, 这个其实就是在离线消息的保存时间 2h = 120min = 7200s
zone.external.session_expiry_interval = 2h
# 注意并不是 zone.external.await_rel_timeout 这个参数
4、其实我们搭建完成 Dashboard后,也是能看出来一些的。


1、 我们使用默认的配置,不修改参数zone.external.max_mqueue_len和zone.external.session_expiry_interval,直接通过EmqxPlatformApplication :7093/应用向队列里面发送2w条消息
这个实验得在2h内完成,因为我们没有修改
zone.external.session_expiry_interval参数
2、 为了方便观察,我们使用控制台,监控该队列的数据流入流出情况,目前为止都是 0

3、 该图是mock数据的接口,我们直接请求该接口,传参为 20000

4、 发送结束,我们看到监控台,已经流入了2w的数据了,因为消费应用EmqxUpperMachineApplication,还没有启动,所以这些消息,会变为离线消息


5、启动消费客户端发现只有 19000-20000的数据,其他数据丢失


6、我们观察日志可以看到,日志中写了,队列已经满了,所以放不下,我们计算下数据的条数就知道,从第五行开始,加上19000,就是说会到19004行,正好 19000条被遗弃掉,所以队列长度,默认为1000
2022-12-09T10:09:27.513000+08:00 [warning] upperMachine@127.0.0.1:53739 [Session] Dropped msg due to mqueue is full: Message(Id=0005EF5BA46867AAF4420000020B0001, QoS=2, Topic=test/thermal/platform_2_upper_machine, From=<<"platform">>, Flags=[], Headers=#{peerhost => {127,0,0,1}, properties => #{},proto_ver => 3,protocol => mqtt,username => <<"thermal">>})


7、所以,如果不限制的话,可以设置为0,则20000条数据就都会在,修改之后,我们也能在控制台看到

1、我们使用默认的配置,不修改参数zone.external.max_mqueue_len,修改zone.external.session_expiry_interval设置为 20分钟,控制台页面也能看到修改的参数值
# 为啥设置那么长时间,是为了和其他参数区分开来,更好的验证,我们过去20分钟就去验证即可。
zone.external.session_expiry_interval = 20m


2、 直接通过EmqxPlatformApplication :7093/应用向队列里面发送10条消息即可,因为我们是验证已经存储的消息的保留时间,不需要太多数据
监控如下.

过20多分钟后 启动客户端发现没有之前的10条数据
查看日志发现已经丢弃
1、这个很重要,我强烈建议你去读取一遍,了解一下有那些参数可以配置,其中有写参数非常重要,
EMQX- V4.4 官方参数文档地址

2、这里我举几个比较重要的参数,比如4.4小节中提到的飞行窗口的参数,看如下图,可得知,离线消息先到飞行窗口里面,再到meassge_queue中,两个的参数分别为 zone.内部/外部.max_inflight 和 max_mqueue_len,并且这些离线数据存储也是有时限的,第四小节中的那张图片中有说明,其实那个就是EMQX的默认配置
含义解释如下:
max_mqueue_len默认值是1000,要是希望不限制,则改为0即可,如果你设置为1000,但是有3000条消息,那么这个队列里面只会保留2000-3000这个一千条最新的数据。1到1999的数据会收不到,以上面的参数,设置离线消息的保留时间。
max_inflight
max_inflight
1、官方说明,如果版本大约4.3.3,还使用docker部署,则自能通过 docker logs 查看,或者如下配置,官方说明 控制日志输出
1、接受队列的客户端一直不在,则会丢弃消息,举一个例子,如下,现有客户端
2、有队列platform_2_upper_machine,则platform上线,向队列platform_2_upper_machine发生消息,但是客户端upperMachine没有上线过或者上线过后,session已经被清除了
什么时候被清除,就是前面说的那个参数
zone.external.session_expiry_interval,下面这个图设置的值是一天,也就是86400秒。这个参数确实可以是保存离线消息的时间,但是提前是你之前有登录过emqx服务器,即下面的记录中有你,也就是在zone.external.session_expiry_interval时间内,存在,一旦这个时间期限一过,而你还是没有登录,则会自动清除,离线消息也就没有了

3、所以这个时候你向队列platform_2_upper_machine发送消息,会直接被遗弃,因为没有 zone.external.session_expiry_interval时间范围内存活的客户端来消费这个队列。

根据第四小节的说明,我们可以知道要修改EMQX的服务参数
修改web_hook的地址,来监听客户端的上下线问题
修改消息队列大小,不用修改飞行窗口的大小 ,修改参数zone.external.max _mqueue_len为0,表示不限制,注意是**外部域(external)**的参数
修改离线消息的保存时间可以根据线上环境来考虑zone.external.session_expiry_interval
还有一个部署时候的日志bug,如果版本大约4.3.3,还使用docker部署,则自能通过 docker logs 查看,或者如下配置,官方说明 控制日志输出
dashboard的账号密码1、登录dashboard的账号密码,默认是admin/public,这个修改的方式很简单,直接到用户模块进行修改即可,这个修改的作用只是用于登录dashboard,无任何其他作用

1、这个看第三节 修改客户端链接授权 即可
1、这个官方也给出了说明 HTTP API,用户名和密码哪里配置?


2、添加完成之后密码就是密钥,会自动生成

3、比如现在我需要获取对应的客户端信息通过api接口,账号密码安装前面的生成即可.

我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div
我有一个Ruby程序,它使用rubyzip压缩XML文件的目录树。gem。我的问题是文件开始变得很重,我想提高压缩级别,因为压缩时间不是问题。我在rubyzipdocumentation中找不到一种为创建的ZIP文件指定压缩级别的方法。有人知道如何更改此设置吗?是否有另一个允许指定压缩级别的Ruby库? 最佳答案 这是我通过查看rubyzip内部创建的代码。level=Zlib::BEST_COMPRESSIONZip::ZipOutputStream.open(zip_file)do|zip|Dir.glob("**/*")d
类classAprivatedeffooputs:fooendpublicdefbarputs:barendprivatedefzimputs:zimendprotecteddefdibputs:dibendendA的实例a=A.new测试a.foorescueputs:faila.barrescueputs:faila.zimrescueputs:faila.dibrescueputs:faila.gazrescueputs:fail测试输出failbarfailfailfail.发送测试[:foo,:bar,:zim,:dib,:gaz].each{|m|a.send(m)resc
很好奇,就使用rubyonrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提
假设我做了一个模块如下:m=Module.newdoclassCendend三个问题:除了对m的引用之外,还有什么方法可以访问C和m中的其他内容?我可以在创建匿名模块后为其命名吗(就像我输入“module...”一样)?如何在使用完匿名模块后将其删除,使其定义的常量不再存在? 最佳答案 三个答案:是的,使用ObjectSpace.此代码使c引用你的类(class)C不引用m:c=nilObjectSpace.each_object{|obj|c=objif(Class===objandobj.name=~/::C$/)}当然这取决于
我正在尝试使用ruby和Savon来使用网络服务。测试服务为http://www.webservicex.net/WS/WSDetails.aspx?WSID=9&CATID=2require'rubygems'require'savon'client=Savon::Client.new"http://www.webservicex.net/stockquote.asmx?WSDL"client.get_quotedo|soap|soap.body={:symbol=>"AAPL"}end返回SOAP异常。检查soap信封,在我看来soap请求没有正确的命名空间。任何人都可以建议我
关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。
我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t
我想将html转换为纯文本。不过,我不想只删除标签,我想智能地保留尽可能多的格式。为插入换行符标签,检测段落并格式化它们等。输入非常简单,通常是格式良好的html(不是整个文档,只是一堆内容,通常没有anchor或图像)。我可以将几个正则表达式放在一起,让我达到80%,但我认为可能有一些现有的解决方案更智能。 最佳答案 首先,不要尝试为此使用正则表达式。很有可能你会想出一个脆弱/脆弱的解决方案,它会随着HTML的变化而崩溃,或者很难管理和维护。您可以使用Nokogiri快速解析HTML并提取文本:require'nokogiri'h
我想为Heroku构建一个Rails3应用程序。他们使用Postgres作为他们的数据库,所以我通过MacPorts安装了postgres9.0。现在我需要一个postgresgem并且共识是出于性能原因你想要pggem。但是我对我得到的错误感到非常困惑当我尝试在rvm下通过geminstall安装pg时。我已经非常明确地指定了所有postgres目录的位置可以找到但仍然无法完成安装:$envARCHFLAGS='-archx86_64'geminstallpg--\--with-pg-config=/opt/local/var/db/postgresql90/defaultdb/po