草庐IT

JAVA实现MQTT通讯介绍

PetterLiu 2023-03-28 原文

JAVA实现MQTT通讯介绍

      MQTT(消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议,为此,它需要一个消息中间件 。MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。

这篇文章的开发环境是:

构建工具。Maven
IDE: IntelliJ IDEA
Java:JDK 1.8.0

我们将使用Eclipse Paho Java Client作为客户端,它是Java语言中使用最广泛的MQTT客户端库。

在pom.xml文件中添加以下依赖项:

<dependencies>
    <dependency>
        <groupId>org.eclipse.paho</groupId>
        <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
        <version>1.2.5</version>
    </dependency>
</dependencies>

创建一个MQTT连接


MQTT代理Broker

本文将使用基于EMQX Cloud创建的公共MQTT代理。EMQ 2.0 (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 语言平台开发,支持大规模连接和分布式集群,发布订阅模式的开源 MQTT 消息服务器。EMQ是国人出产的一个开源Broker,已经用于很多企业生产了,几乎是目前的全能Broker了,文档和资料也非常齐全,但它是用Erlang语言编写的,这是一个不常见的语言。有两个版本2.0和3.0,最大的区别是3.0的集群化更好,支持集群共享订阅功能,2.0只支持本地共享订阅功能。同时3.0支持mqtt5.0,其他的都是一些性能优化。其他的项目有,先列个表,这些已经算是比较优秀的Broker了,分析主要特性:

✔ - 支持

✘ - 不支持

? - 不了解

§ - 支持但做得不好(有限制)


  • EMQ 提供了一个性能测试工具 emqtt-benck,采用 Erlang 编写,适合来做 MQTT Broker 性能测试。

    • 连接:指定连接数、连接速率,测试 MQTT Broker 的连接性能(速率、响应时间、错误数)
    • 订阅:指定连接数、主题数、订阅速率,QoS、测试 MQTT Broker 的订阅性能(速率、响应时间、错误数)
    • 发布:指定连接数、消息发布速率、消息大小、QoS,测试 MQTT Broker 的消息吞吐性能(速率、响应时间、错误数)

服务器的访问信息如下:

代理商:broker.emqx.io
TCP端口: 1883
SSL/TLS端口: 8883
连接
设置MQTT的基本连接参数。用户名和密码是可选的。

String broker = "tcp://broker.emqx.io:1883";
// TLS/SSL
// String broker = "ssl://broker.emqx.io:8883";
String username = "emqx";
String password = "public";
String clientid = "publish_client";

然后创建一个MQTT客户端并连接到经纪人:

MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
client.connect(options);

说明:

MqttClient:MqttClient提供了一组方法,一旦MQTT动作完成,这些方法就会阻塞并将控制权返回给应用程序。
MqttClientPersistence:代表一个持久的数据存储,用于在飞行中存储出站和入站的消息,使之能够交付给指定的QoS。
MqttConnectOptions:保存控制客户端如何连接到服务器的选项集。下面是一些常用的方法。
setUserName:设置用于连接的用户名。
setPassword: 设置连接时使用的密码。
setCleanSession:设置客户端和服务器是否应该在重新启动和重新连接时记住状态。
setKeepAliveInterval: 设置 "保持生存 "的时间间隔。
setConnectionTimeout:设置连接超时值。
setAutomaticReconnect(设置自动连接):设置如果连接丢失,客户端是否会自动尝试重新连接到服务器。

用TLS/SSL连接
如果你想在TLS/SSL连接中使用自签名证书,请在pom.xml文件中加入bcpkix-jdk15on:

<!-- https://mvnrepository.com/artifact/org.bouncycastle/bcpkix-jdk15on -->
<dependency>
    <groupId>org.bouncycastle</groupId>
    <artifactId>bcpkix-jdk15on</artifactId>
    <version>1.70</version>
</dependency>

然后用以下代码创建SSLUtils.java文件:

package io.emqx.mqtt;

import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.bouncycastle.openssl.PEMKeyPair;
import org.bouncycastle.openssl.PEMParser;
import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;

import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManagerFactory;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.FileReader;
import java.security.KeyPair;
import java.security.KeyStore;
import java.security.Security;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;

public class SSLUtils {
    public static SSLSocketFactory getSocketFactory(final String caCrtFile,
                                                    final String crtFile, final String keyFile, final String password)
            throws Exception {
        Security.addProvider(new BouncyCastleProvider());

       // load CA certificate
        X509Certificate caCert = null;

       FileInputStream fis = new FileInputStream(caCrtFile);
        BufferedInputStream bis = new BufferedInputStream(fis);
        CertificateFactory cf = CertificateFactory.getInstance("X.509");

       while (bis.available() > 0) {
            caCert = (X509Certificate) cf.generateCertificate(bis);
       }

       // load client certificate
        bis = new BufferedInputStream(new FileInputStream(crtFile));
        X509Certificate cert = null;
        while (bis.available() > 0) {
            cert = (X509Certificate) cf.generateCertificate(bis);
       }

       // load client private key
        PEMParser pemParser = new PEMParser(new FileReader(keyFile));
        Object object = pemParser.readObject();
        JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider("BC");
        KeyPair key = converter.getKeyPair((PEMKeyPair) object);
        pemParser.close();

       // CA certificate is used to authenticate server
        KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType());
        caKs.load(null, null);
        caKs.setCertificateEntry("ca-certificate", caCert);
        TrustManagerFactory tmf = TrustManagerFactory.getInstance("X509");
        tmf.init(caKs);

       // client key and certificates are sent to server so it can authenticate
        KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
        ks.load(null, null);
        ks.setCertificateEntry("certificate", cert);
        ks.setKeyEntry("private-key", key.getPrivate(), password.toCharArray(),
                new java.security.cert.Certificate[]{cert});
        KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory
               .getDefaultAlgorithm());
        kmf.init(ks, password.toCharArray());

       // finally, create SSL socket factory
        SSLContext context = SSLContext.getInstance("TLSv1.2");
        context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);

       return context.getSocketFactory();
   }
}

设置选项如下:

String broker = "ssl://broker.emqx.io:8883";
// Set socket factory
String caFilePath = "/cacert.pem";
String clientCrtFilePath = "/client.pem";
String clientKeyFilePath = "/client.key";
SSLSocketFactory socketFactory = getSocketFactory(caFilePath, clientCrtFilePath, clientKeyFilePath, "");
options.setSocketFactory(socketFactory);

发布MQTT消息


创建一个PublishSample类,它将发布一个Hello MQTT消息到主题mqtt/test:

package io.emqx.mqtt;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class PublishSample {

   public static void main(String[] args) {

       String broker = "tcp://broker.emqx.io:1883";
        String topic = "mqtt/test";
        String username = "emqx";
        String password = "public";
        String clientid = "publish_client";
        String content = "Hello MQTT";
        int qos = 0;

       try {
            MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(username);
            options.setPassword(password.toCharArray());
            options.setConnectionTimeout(60);
       options.setKeepAliveInterval(60);
            // connect
            client.connect(options);
            // create message and setup QoS
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(qos);
            // publish message
            client.publish(topic, message);
            System.out.println("Message published");
            System.out.println("topic: " + topic);
            System.out.println("message content: " + content);
            // disconnect
            client.disconnect();
            // close client
            client.close();
       } catch (MqttException e) {
            throw new RuntimeException(e);
       }
   }
}

订阅


创建一个SubscribeSample类,它将订阅主题mqtt/test:

package io.emqx.mqtt;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class SubscribeSample {
    public static void main(String[] args) {
        String broker = "tcp://broker.emqx.io:1883";
        String topic = "mqtt/test";
        String username = "emqx";
        String password = "public";
        String clientid = "subscribe_client";
        int qos = 0;

       try {
            MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
            // connect options
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(username);
            options.setPassword(password.toCharArray());
            options.setConnectionTimeout(60);
       options.setKeepAliveInterval(60);
            // setup callback
            client.setCallback(new MqttCallback() {

               public void connectionLost(Throwable cause) {
                    System.out.println("connectionLost: " + cause.getMessage());
               }

               public void messageArrived(String topic, MqttMessage message) {
                    System.out.println("topic: " + topic);
                    System.out.println("Qos: " + message.getQos());
                    System.out.println("message content: " + new String(message.getPayload()));

              }

               public void deliveryComplete(IMqttDeliveryToken token) {
                    System.out.println("deliveryComplete---------" + token.isComplete());
               }

          });
            client.connect(options);
            client.subscribe(topic, qos);
       } catch (Exception e) {
            e.printStackTrace();
       }
   }
}

MqttCallback:

connectionLost(Throwable cause)。当与服务器的连接丢失时,该方法被调用。
messageArrived(String topic, MqttMessage message)。当一个消息从服务器到达时,该方法被调用。
deliveryComplete(IMqttDeliveryToken token)。当一个消息的传递已经完成,并且所有的确认都已收到时调用。

测试


接下来,运行SubscribeSample来订阅mqtt/test主题。然后运行PublishSample来发布mqtt/test主题上的消息。我们将看到,发布者成功地发布了消息,订阅者也收到了消息:

也可以使用Spring Integration,Spring Integration MQTT Support 默认集成的就是 Eclipse Paho Java Client V3 版本。Spring Integration 的好处在于,我们只需要了解其消息通信的基本机制,屏蔽了 Eclipse Paho Java Client 的具体细节,便于编码。

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
     <groupId>org.springframework.integration</groupId>
     <artifactId>spring-integration-mqtt</artifactId>
</dependency>


总结


    现在我们已经完成了使用Paho Java客户端作为MQTT客户端来连接到公共MQTT服务器并实现消息发布和订阅。同时也引入Spring Integration介绍。



今天先到这儿,希望对云原生,技术领导力, 企业管理,系统架构设计与评估,团队管理, 项目管理, 产品管管,团队建设 有参考作用 , 您可能感兴趣的文章:
领导人怎样带领好团队
构建创业公司突击小团队
国际化环境下系统架构演化
微服务架构设计
视频直播平台的系统架构演化
微服务与Docker介绍
Docker与CI持续集成/CD
互联网电商购物车架构演变案例
互联网业务场景下消息队列架构
互联网高效研发团队管理演进之一
消息系统架构设计演进
互联网电商搜索架构演化之一
企业信息化与软件工程的迷思
企业项目化管理介绍
软件项目成功之要素
人际沟通风格介绍一
精益IT组织与分享式领导
学习型组织与企业
企业创新文化与等级观念
组织目标与个人目标
初创公司人才招聘与管理
人才公司环境与企业文化
企业文化、团队文化与知识共享
高效能的团队建设
项目管理沟通计划
构建高效的研发与自动化运维
某大型电商云平台实践
互联网数据库架构设计思路
IT基础架构规划方案一(网络系统规划)
餐饮行业解决方案之客户分析流程
餐饮行业解决方案之采购战略制定与实施流程
餐饮行业解决方案之业务设计流程
供应链需求调研CheckList
企业应用之性能实时度量系统演变

如有想了解更多软件设计与架构, 系统IT,企业信息化, 团队管理 资讯,请关注我的微信订阅号:

作者:Petter Liu
出处:http://www.cnblogs.com/wintersun/
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。 该文章也同时发布在我的独立博客中-Petter Liu Blog。

有关JAVA实现MQTT通讯介绍的更多相关文章

  1. java - 等价于 Java 中的 Ruby Hash - 2

    我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/

  2. ruby - 如何根据特征实现 FactoryGirl 的条件行为 - 2

    我有一个用户工厂。我希望默认情况下确认用户。但是鉴于unconfirmed特征,我不希望它们被确认。虽然我有一个基于实现细节而不是抽象的工作实现,但我想知道如何正确地做到这一点。factory:userdoafter(:create)do|user,evaluator|#unwantedimplementationdetailshereunlessFactoryGirl.factories[:user].defined_traits.map(&:name).include?(:unconfirmed)user.confirm!endendtrait:unconfirmeddoenden

  3. java - 从 JRuby 调用 Java 类的问题 - 2

    我正在尝试使用boilerpipe来自JRuby。我看过guide从JRuby调用Java,并成功地将它与另一个Java包一起使用,但无法弄清楚为什么同样的东西不能用于boilerpipe。我正在尝试基本上从JRuby中执行与此Java等效的操作:URLurl=newURL("http://www.example.com/some-location/index.html");Stringtext=ArticleExtractor.INSTANCE.getText(url);在JRuby中试过这个:require'java'url=java.net.URL.new("http://www

  4. java - 我的模型类或其他类中应该有逻辑吗 - 2

    我只想对我一直在思考的这个问题有其他意见,例如我有classuser_controller和classuserclassUserattr_accessor:name,:usernameendclassUserController//dosomethingaboutanythingaboutusersend问题是我的User类中是否应该有逻辑user=User.newuser.do_something(user1)oritshouldbeuser_controller=UserController.newuser_controller.do_something(user1,user2)我

  5. java - 什么相当于 ruby​​ 的 rack 或 python 的 Java wsgi? - 2

    什么是ruby​​的rack或python的Java的wsgi?还有一个路由库。 最佳答案 来自Python标准PEP333:Bycontrast,althoughJavahasjustasmanywebapplicationframeworksavailable,Java's"servlet"APImakesitpossibleforapplicationswrittenwithanyJavawebapplicationframeworktoruninanywebserverthatsupportstheservletAPI.ht

  6. 华为OD机试用Python实现 -【明明的随机数】 2023Q1A - 2

    华为OD机试题本篇题目:明明的随机数题目输入描述输出描述:示例1输入输出说明代码编写思路最近更新的博客华为od2023|什么是华为od,od薪资待遇,od机试题清单华为OD机试真题大全,用Python解华为机试题|机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为o

  7. Unity 热更新技术 | (三) Lua语言基本介绍及下载安装 - 2

    ?博客主页:https://xiaoy.blog.csdn.net?本文由呆呆敲代码的小Y原创,首发于CSDN??学习专栏推荐:Unity系统学习专栏?游戏制作专栏推荐:游戏制作?Unity实战100例专栏推荐:Unity实战100例教程?欢迎点赞?收藏⭐留言?如有错误敬请指正!?未来很长,值得我们全力奔赴更美好的生活✨------------------❤️分割线❤️-------------------------

  8. Observability:从零开始创建 Java 微服务并监控它 (二) - 2

    这篇文章是继上一篇文章“Observability:从零开始创建Java微服务并监控它(一)”的续篇。在上一篇文章中,我们讲述了如何创建一个Javaweb应用,并使用Filebeat来收集应用所生成的日志。在今天的文章中,我来详述如何收集应用的指标,使用APM来监控应用并监督web服务的在线情况。源码可以在地址 https://github.com/liu-xiao-guo/java_observability 进行下载。摄入指标指标被视为可以随时更改的时间点值。当前请求的数量可以改变任何毫秒。你可能有1000个请求的峰值,然后一切都回到一个请求。这也意味着这些指标可能不准确,你还想提取最小/

  9. 【Java 面试合集】HashMap中为什么引入红黑树,而不是AVL树呢 - 2

    HashMap中为什么引入红黑树,而不是AVL树呢1.概述开始学习这个知识点之前我们需要知道,在JDK1.8以及之前,针对HashMap有什么不同。JDK1.7的时候,HashMap的底层实现是数组+链表JDK1.8的时候,HashMap的底层实现是数组+链表+红黑树我们要思考一个问题,为什么要从链表转为红黑树呢。首先先让我们了解下链表有什么不好???2.链表上述的截图其实就是链表的结构,我们来看下链表的增删改查的时间复杂度增:因为链表不是线性结构,所以每次添加的时候,只需要移动一个节点,所以可以理解为复杂度是N(1)删:算法时间复杂度跟增保持一致查:既然是非线性结构,所以查询某一个节点的时候

  10. 基于C#实现简易绘图工具【100010177】 - 2

    C#实现简易绘图工具一.引言实验目的:通过制作窗体应用程序(C#画图软件),熟悉基本的窗体设计过程以及控件设计,事件处理等,熟悉使用C#的winform窗体进行绘图的基本步骤,对于面向对象编程有更加深刻的体会.Tutorial任务设计一个具有基本功能的画图软件**·包括简单的新建文件,保存,重新绘图等功能**·实现一些基本图形的绘制,包括铅笔和基本形状等,学习橡皮工具的创建**·设计一个合理舒适的UI界面**注明:你可能需要先了解一些关于winform窗体应用程序绘图的基本知识,以及关于GDI+类和结构的知识二.实验环境Windows系统下的visualstudio2017C#窗体应用程序三.

随机推荐