SpringBoot整合MOTT动态读取数据库连接信息并连接MQTT服务端
作者:mmseoamin日期:2024-04-27

SpringBoot整合MOTT动态读取数据库连接信息并连接MQTT服务端

MQTT介绍:

概述:

MQTT是一种轻量级的消息传输协议(Message Queuing Telemetry Transport),旨在实现设备之间的低带宽和高延迟的通信。它是基于发布/订阅模式(Publish/Subscribe)的消息协议,最初由IBM开发,现在成为了一种开放的标准,被广泛应用于物联网(IoT)领域。

MQTT特点包括:

1、轻量级:MQTT协议设计简单,消息头部轻量,适用于受限环境的设备,如传感器、嵌入式设备等。

2、简单易用:MQTT采用发布/订阅模式,消息的发送者(发布者)和接收者(订阅者)之间解耦,通信过程简单易理解。

3、低带宽、高延迟:MQTT协议设计考虑了网络带宽受限和延迟较高的情况,能够在不理想的网络环境下保持稳定的消息传输。

4、可靠性:MQTT支持消息的持久化和确认机制,确保消息的可靠传输,同时提供了QoS(Quality of Service)等级,可以根据实际需求进行灵活配置。

5、灵活性:MQTT支持多种消息格式和负载类型,可以传输文本、二进制数据等多种类型的消息,同时支持SSL/TLS加密,保障通信安全。

5、适用于多种场景:由于其轻量级和灵活性,MQTT被广泛应用于物联网、传感器网络、远程监控、消息通知等场景,成为连接设备的重要通信协议之一。

话不多说,直接看代码如何连接

因项目需求,本次做的是在项目启动时,动态读取数据库中已经配置好的mqtt连接信息,并且根据这些信息动态的循环连接服务端,在接收到消息后进行持久化和相关逻辑处理。

一、首先加载依赖


    org.springframework.integration
    spring-integration-mqtt

二、因为是要在项目启动时候连接,但是又要等项目初始化后拿到要用的mapper,所以在这个类中需要实现ApplicationRunner接口,而没有用其他的方法,有多种实现但是我用的这个
package com.ruoyi;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.ObjUtil;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.ruoyi.system.domain.mqtt.MqttBean;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.quartz.impl.StdSchedulerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
/**
 * @author 1097022316
 * 启动后建立MQTT连接 并对数据持久化和相关逻辑处理
 */
@Component
public class StartInit implements ApplicationRunner {
    @Override
    public void run(ApplicationArguments args) throws Exception {
        MemoryPersistence persistence = new MemoryPersistence();
        
        //这里是从数据库中查询出所有的mqtt连接相关信息,如ip、topic等
        List collectorList = collectorService.list(new QueryWrapper().isNotNull("collector_ip").ne("collector_ip", "").eq("is_del", "0").groupBy("collector_topic"));
        
        System.err.println("collectorList = " + collectorList);
        
        //如果没有任何连接 直接结束
        if (CollUtil.isEmpty(collectorList)) {
            return;
        }
        
        List mqttBeans = new ArrayList<>();
        
        //把从数据库中查询出来的信息组装成mqttclient连接所需要的对象
        //一般都是ip、port、username、password、topic、clientid 这里是简单的用法,如有高级用法可自行摸索(顺便在下面评论教一下)
        
        collectorList.forEach(mqtt -> {
            MqttBean bean = new MqttBean("tcp://" + mqtt.getCollectorIp() + ":" + mqtt.getCollectorPort(), mqtt.getCollectorUsername(), mqtt.getCollectorPassword(), mqtt.getCollectorTopic(), mqtt.getCollectorClientId());
            mqttBeans.add(bean);
        });
        //对我们组装的mqtt连接对象信息进行遍历循环连接
        mqttBeans.forEach(bean -> {
            try {
                    MqttClient mqttClient = new MqttClient(bean.getUrl(), bean.getClientId(), persistence);
                    MqttConnectOptions connOpts = new MqttConnectOptions();
                	//设置相关连接参数,有些是必要有些是非必要 可自行点进去查看源码
                    connOpts.setAutomaticReconnect(false);
                    connOpts.setCleanSession(true);
                    connOpts.setUserName(bean.getUserName());
                    connOpts.setPassword(bean.getPassword().toCharArray());
                    mqttClient.connect(connOpts);
                    //把连接对象加入到全局
                    mqttClients.put(bean.getTopics(), mqttClient);
                    mqttClient.setCallback(new MqttCallbackExtended() {
                        @Override
                        public void connectComplete(boolean reconnect, String serverURI) {
                            if (reconnect) {
                                System.out.println("Reconnected successfully.  url = " + serverURI);
                            } else {
                                System.out.println("Connected successfully for the first time.");
                            }
                        }
                        /**
                         * 设置重连机制
                         */
                        @Override
                        public void connectionLost(Throwable cause) {
                            System.err.println(bean.getTopics() + "连接丢失" + cause.getMessage());
                            if (!mqttClient.isConnected()) {
                                try {
                                    Thread.sleep(1000 * 60 * 5);
                                    //尝试连接
                                    System.out.println("bean = " + bean);
                                    boolean flag = MyMqttUtils.connectMqtt(new MqttBean(bean.getUrl(), bean.getUserName(), bean.getPassword(), bean.getTopics(), bean.getClientId()));
                                    if (flag) {
                                        System.err.println(bean.getTopics() + "重新连接,重新订阅!");
                                    }
                                } catch (InterruptedException e) {
                                    MyMqttUtils.saveToTxt(bean.getTopics() + "MQTT连接出异常了" + e.getMessage(), "CLCW");
                                    throw new RuntimeException(e);
                                }
                            }
                        }
                        @Override
                        public void messageArrived(String topic, MqttMessage message) {
							//这里是当消息推送时我们做的事情
                            System.out.println("dosomethings...");
                        }
                        @Override
                        public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                            System.out.println("消息发送完整");
                        }
                    });
                    mqttClient.subscribe(bean.getTopics(), 2);
            } catch (Exception e) {
                e.printStackTrace();
                System.err.println(bean.getTopics() + "MQTT连接出异常了");
                MyMqttUtils.saveToTxt(bean.getTopics() + "MQTT连接出异常了"+e.getMessage(),"LJCW");
                try {
                    Thread.sleep(1000 * 60 * 30);
                    //尝试连接
                    System.out.println("bean = " + bean);
                    boolean flag = MyMqttUtils.connectMqtt(new MqttBean(bean.getUrl(), bean.getUserName(), bean.getPassword(), bean.getTopics(), bean.getClientId()));
                    if (flag) {
                        System.err.println(bean.getTopics() + "重新连接,重新订阅!");
                    }
                } catch (InterruptedException ex) {
                    //可以在这里把报错信息存入本地看看
                    throw new RuntimeException(ex);
                }
            }
        });
    }
}
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
 * @author 1097022316
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MqttBean {
    private String url;
    private String userName;
    private String password;
    private String topics;
    private String clientId;
}

重点是MqttClient中几个参数和配置参数,以及那几个重写的方法,看下源码就好了。这里用的比较粗糙,只是简单的实现了连接和重连,一些复杂的如心跳或者遗嘱啥的都没用,要研究可自行查看