草庐IT

用java开发MQTT(SSL连接)

我只是一名游客 2023-12-29 原文

近期又接触到了新的一个东西MQTT,用本地环境模拟一下吧。

主要是用EMQ作为服务器,首先当然是去官网下载一个EMQ  下载 EMQX

我本地用的是windows版本,下载完后进去bin目录后有个emqx文件
用命令窗口输入emqx start 就启动了

这里主要是用mysql来鉴权,设置一下就好了,当然建表语句参考一下文档使用 MySQL 的密码认证 | EMQX 5.0 文档

 按规则自己创建账号密码,用工具测试一下吧

 服务器就没啥问题,那么剩下客户端,

我直接上代码啥也不说了

MqttService.class
package mqtt;

/**
 * @author xxx
 * @date 2022/11/3 16:31
 * @description
 */
public interface MqttService {
    /**
     * 发布
     * @param topic
     * @param data
     */
    public void sendToMqtt(String topic, String data);

    /**
     * 订阅
     * @param topics
     * @param qoss
     */
    public void subscribeFromMqtt(String[] topics,int[] qoss);

    /**
     *  初始化订阅主题
     */
    public void subscribeInit();



}
MqttServiceImpl.class
package mqtt.impl;


import mqtt.MqttService;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.net.SocketFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;

/**
 * @author xxx
 * @date 2022/11/3 16:31
 * @description
 */
@Service
public class MqttServiceImpl implements MqttService,InitializingBean {


    @Value("${mqtt.username:xxx}")
    private String username;
    @Value("${mqtt.password:xxx}")
    private String password;
    @Value("${mqtt.serverURI:ssl://localhost:8883}")
    private String serverURI;
    @Value("${mqtt.clientId:xxx}")
    private String clientId;


    private MqttClient mqttClient;


    @Autowired
    private MyMqttCallback myMqttCallback;


    @Override
    public void sendToMqtt(String topic, String data) {
        MqttClient mqttClientPublish=init(clientId);
        try {
            MqttMessage message = new MqttMessage();
            message.setQos(1);
            message.setPayload(data.getBytes("UTF-8"));
            mqttClientPublish.publish(topic, message);
        }  catch (Exception e) {
            e.printStackTrace();
        }finally {
            try {
                mqttClientPublish.disconnect();//断开连接
                mqttClientPublish.close();//抛出异常时,不能仅断开连接,内部会创建连接池等资源造成内存不够用的情况,应用close释放资源
            } catch (MqttException e1) {
                e1.printStackTrace();
            }
        }
    }

    @Override
    public void subscribeFromMqtt(String[] topics,int[] qoss) {
        try {
            mqttClient.setCallback(myMqttCallback);
            mqttClient.subscribe(topics,qoss);
        } catch (MqttException e) {
            e.printStackTrace();
            try {
                mqttClient.disconnect();//断开连接
                mqttClient.close();//抛出异常时,不能仅断开连接,内部会创建连接池等资源造成内存不够用的情况,应用close释放资源
            } catch (MqttException e1) {
                e1.printStackTrace();
            }
        }
    }

    public MqttClient init(String clientIdPublish) {
        try {
            mqttClient = new MqttClient(serverURI, clientIdPublish);//使用随机值clientId解决集群问题,但是凭证生成密码使用同一个clientId
            // MQTT 连接选项
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setUserName(username);
            connOpts.setPassword(password.toCharArray());
            connOpts.setCleanSession(true);
            // 设置超时时间
            connOpts.setConnectionTimeout(10);
            // 设置会话心跳时间
            connOpts.setKeepAliveInterval(60);
            // 设置自动重连
            connOpts.setAutomaticReconnect(true);
            //忽略证书验证--单向认证不需要的
            connOpts.setHttpsHostnameVerificationEnabled(false);

            //ssl 连接 , 这里的 TrustManager 是自己实现的,没有去校验服务端的证书
            TrustManager[] trustAllCerts = new TrustManager[1];
            TrustManager tm = new MyTM();
            trustAllCerts[0] = tm;
            SSLContext sc = SSLContext.getInstance("SSL");
            sc.init(null, trustAllCerts, null);
            SocketFactory factory = sc.getSocketFactory();
            connOpts.setSocketFactory(factory);
            // 建立连接
            mqttClient.connect(connOpts);
            mqttClient.setTimeToWait(5000);//客户端设置好发送超时时间,防止无限阻塞
        }catch (Exception e){
            e.printStackTrace();
        }

        return mqttClient;
    }


    @Override
    public void afterPropertiesSet() throws Exception {
        //初始化mqtt客户端
        subscribeInit();
    }


    public void subscribeInit(){
        //订阅已有的主题
        String[] topics= {"/sys/test"};
        int[] qoss={0};
        this.subscribeFromMqtt(topics,qoss);
    }


}


MyMqttCallback.class

package mqtt.impl;

import mqtt.MqttService;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * @author xxx
 * @date 2022/11/13 0:38
 * @description
 */

@Service
public class MyMqttCallback implements MqttCallbackExtended {
    @Autowired
    private MqttService mqttService;

    @Override
    public void connectionLost(Throwable throwable) {

    }

    @Override
    public void messageArrived(String topic, MqttMessage message) {
        //消息回调
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

    }

    @Override
    public void connectComplete(boolean b, String s) {
        try {
            //重新订阅主题
            mqttService.subscribeInit();
        }catch (Exception e){
            e.printStackTrace();
        }

    }
}

MyTM.class
package mqtt.impl;

import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;

/**
 * @author xxx
 * @date 2022/11/5 17:28
 * @description  MyTM 是自己实现的认证管理类,里面并有校验服务端的证书就返回true,永久成功!
 */
public class MyTM implements TrustManager, X509TrustManager {
    @Override
    public X509Certificate[] getAcceptedIssuers() {
        return null;
    }

    public boolean isServerTrusted(X509Certificate[] certs) {
        return true;
    }

    public boolean isClientTrusted(X509Certificate[] certs) {
        return true;
    }

    @Override
    public void checkServerTrusted(X509Certificate[] certs, String authType)
            throws CertificateException {
        return;
    }

    @Override
    public void checkClientTrusted(X509Certificate[] certs, String authType)
            throws CertificateException {
        return;
    }
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>mqtt-win-new3</artifactId>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.1</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.4.1</version>
        </dependency>

        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.2</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba.fastjson2</groupId>
            <artifactId>fastjson2</artifactId>
            <version>2.0.22</version>
        </dependency>

        <dependency>
            <groupId>commons-codec</groupId>
            <artifactId>commons-codec</artifactId>
            <version>1.15</version>
        </dependency>
    </dependencies>
    
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <mainClass>com.mqtt.win.DemoApplication</mainClass>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

本次开发主要是为了对接设备,主要是项目启动的时候就要订阅对应的主题,所以客户端实例化启动是通过Bean的实例化完成后启动的客户端,Bean实现这个接口InitializingBean,还有客户端掉线重连后需要重新初始化订阅主题(因为cleansession设置了true)。

当然这个只是一个简单案例,实际情况肯定不止考虑这些,比如项目如果用的是集群,可以考虑用共享订阅,避免重复消费等。

            

有关用java开发MQTT(SSL连接)的更多相关文章

  1. ruby - 使用 C 扩展开发 ruby​​gem 时,如何使用 Rspec 在本地进行测试? - 2

    我正在编写一个包含C扩展的gem。通常当我写一个gem时,我会遵循TDD的过程,我会写一个失败的规范,然后处理代码直到它通过,等等......在“ext/mygem/mygem.c”中我的C扩展和在gemspec的“扩展”中配置的有效extconf.rb,如何运行我的规范并仍然加载我的C扩展?当我更改C代码时,我需要采取哪些步骤来重新编译代码?这可能是个愚蠢的问题,但是从我的gem的开发源代码树中输入“bundleinstall”不会构建任何native扩展。当我手动运行rubyext/mygem/extconf.rb时,我确实得到了一个Makefile(在整个项目的根目录中),然后当

  2. ruby - 续集在添加关联时访问many_to_many连接表 - 2

    我正在使用Sequel构建一个愿望list系统。我有一个wishlists和itemstable和一个items_wishlists连接表(该名称是续集选择的名称)。items_wishlists表还有一个用于facebookid的额外列(因此我可以存储opengraph操作),这是一个NOTNULL列。我还有Wishlist和Item具有续集many_to_many关联的模型已建立。Wishlist类也有:selectmany_to_many关联的选项设置为select:[:items.*,:items_wishlists__facebook_action_id].有没有一种方法可以

  3. java - 等价于 Java 中的 Ruby Hash - 2

    我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/

  4. ruby - 无法在 60 秒内获得稳定的 Firefox 连接 (127.0.0.1 :7055) - 2

    我使用的是Firefox版本36.0.1和Selenium-Webdrivergem版本2.45.0。我能够创建Firefox实例,但无法使用脚本继续进行进一步的操作无法在60秒内获得稳定的Firefox连接(127.0.0.1:7055)错误。有人能帮帮我吗? 最佳答案 我遇到了同样的问题。降级到firefoxv33后一切正常。您可以找到旧版本here 关于ruby-无法在60秒内获得稳定的Firefox连接(127.0.0.1:7055),我们在StackOverflow上找到一个类

  5. Ruby Sinatra 配置用于生产和开发 - 2

    我已经在Sinatra上创建了应用程序,它代表了一个简单的API。我想在生产和开发上进行部署。我想在部署时选择,是开发还是生产,一些方法的逻辑应该改变,这取决于部署类型。是否有任何想法,如何完成以及解决此问题的一些示例。例子:我有代码get'/api/test'doreturn"Itisdev"end但是在部署到生产环境之后我想在运行/api/test之后看到ItisPROD如何实现? 最佳答案 根据SinatraDocumentation:EnvironmentscanbesetthroughtheRACK_ENVenvironm

  6. ruby - 是否可以覆盖 gemfile 进行本地开发? - 2

    我们的git存储库中目前有一个Gemfile。但是,有一个gem我只在我的环境中本地使用(我的团队不使用它)。为了使用它,我必须将它添加到我们的Gemfile中,但每次我checkout到我们的master/dev主分支时,由于与跟踪的gemfile冲突,我必须删除它。我想要的是类似Gemfile.local的东西,它将继承从Gemfile导入的gems,但也允许在那里导入新的gems以供使用只有我的机器。此文件将在.gitignore中被忽略。这可能吗? 最佳答案 设置BUNDLE_GEMFILE环境变量:BUNDLE_GEMFI

  7. ruby - 在 Windows 机器上使用 Ruby 进行开发是否会适得其反? - 2

    这似乎非常适得其反,因为太多的gem会在window上破裂。我一直在处理很多mysql和ruby​​-mysqlgem问题(gem本身发生段错误,一个名为UnixSocket的类显然在Windows机器上不能正常工作,等等)。我只是在浪费时间吗?我应该转向不同的脚本语言吗? 最佳答案 我在Windows上使用Ruby的经验很少,但是当我开始使用Ruby时,我是在Windows上,我的总体印象是它不是Windows原生系统。因此,在主要使用Windows多年之后,开始使用Ruby促使我切换回原来的系统Unix,这次是Linux。Rub

  8. ruby-on-rails - 在 Rails 开发环境中为 .ogv 文件设置 Mime 类型 - 2

    我正在玩HTML5视频并且在ERB中有以下片段:mp4视频从在我的开发环境中运行的服务器很好地流式传输到chrome。然而firefox显示带有海报图像的视频播放器,但带有一个大X。问题似乎是mongrel不确定ogv扩展的mime类型,并且只返回text/plain,如curl所示:$curl-Ihttp://0.0.0.0:3000/pr6.ogvHTTP/1.1200OKConnection:closeDate:Mon,19Apr201012:33:50GMTLast-Modified:Sun,18Apr201012:46:07GMTContent-Type:text/plain

  9. java - 从 JRuby 调用 Java 类的问题 - 2

    我正在尝试使用boilerpipe来自JRuby。我看过guide从JRuby调用Java,并成功地将它与另一个Java包一起使用,但无法弄清楚为什么同样的东西不能用于boilerpipe。我正在尝试基本上从JRuby中执行与此Java等效的操作:URLurl=newURL("http://www.example.com/some-location/index.html");Stringtext=ArticleExtractor.INSTANCE.getText(url);在JRuby中试过这个:require'java'url=java.net.URL.new("http://www

  10. java - 我的模型类或其他类中应该有逻辑吗 - 2

    我只想对我一直在思考的这个问题有其他意见,例如我有classuser_controller和classuserclassUserattr_accessor:name,:usernameendclassUserController//dosomethingaboutanythingaboutusersend问题是我的User类中是否应该有逻辑user=User.newuser.do_something(user1)oritshouldbeuser_controller=UserController.newuser_controller.do_something(user1,user2)我

随机推荐