MQTT是一种轻量级的消息传输协议(Message Queuing Telemetry Transport),旨在实现设备之间的低带宽和高延迟的通信。它是基于发布/订阅模式(Publish/Subscribe)的消息协议,最初由IBM开发,现在成为了一种开放的标准,被广泛应用于物联网(IoT)领域。
1、轻量级:MQTT协议设计简单,消息头部轻量,适用于受限环境的设备,如传感器、嵌入式设备等。
2、简单易用:MQTT采用发布/订阅模式,消息的发送者(发布者)和接收者(订阅者)之间解耦,通信过程简单易理解。
3、低带宽、高延迟:MQTT协议设计考虑了网络带宽受限和延迟较高的情况,能够在不理想的网络环境下保持稳定的消息传输。
4、可靠性:MQTT支持消息的持久化和确认机制,确保消息的可靠传输,同时提供了QoS(Quality of Service)等级,可以根据实际需求进行灵活配置。
5、灵活性:MQTT支持多种消息格式和负载类型,可以传输文本、二进制数据等多种类型的消息,同时支持SSL/TLS加密,保障通信安全。
5、适用于多种场景:由于其轻量级和灵活性,MQTT被广泛应用于物联网、传感器网络、远程监控、消息通知等场景,成为连接设备的重要通信协议之一。
org.springframework.integration spring-integration-mqtt
package com.ruoyi; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.ObjUtil; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.ruoyi.system.domain.mqtt.MqttBean; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.quartz.impl.StdSchedulerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; /** * @author 1097022316 * 启动后建立MQTT连接 并对数据持久化和相关逻辑处理 */ @Component public class StartInit implements ApplicationRunner { @Override public void run(ApplicationArguments args) throws Exception { MemoryPersistence persistence = new MemoryPersistence(); //这里是从数据库中查询出所有的mqtt连接相关信息,如ip、topic等 ListcollectorList = collectorService.list(new QueryWrapper ().isNotNull("collector_ip").ne("collector_ip", "").eq("is_del", "0").groupBy("collector_topic")); System.err.println("collectorList = " + collectorList); //如果没有任何连接 直接结束 if (CollUtil.isEmpty(collectorList)) { return; } List mqttBeans = new ArrayList<>(); //把从数据库中查询出来的信息组装成mqttclient连接所需要的对象 //一般都是ip、port、username、password、topic、clientid 这里是简单的用法,如有高级用法可自行摸索(顺便在下面评论教一下) collectorList.forEach(mqtt -> { MqttBean bean = new MqttBean("tcp://" + mqtt.getCollectorIp() + ":" + mqtt.getCollectorPort(), mqtt.getCollectorUsername(), mqtt.getCollectorPassword(), mqtt.getCollectorTopic(), mqtt.getCollectorClientId()); mqttBeans.add(bean); }); //对我们组装的mqtt连接对象信息进行遍历循环连接 mqttBeans.forEach(bean -> { try { MqttClient mqttClient = new MqttClient(bean.getUrl(), bean.getClientId(), persistence); MqttConnectOptions connOpts = new MqttConnectOptions(); //设置相关连接参数,有些是必要有些是非必要 可自行点进去查看源码 connOpts.setAutomaticReconnect(false); connOpts.setCleanSession(true); connOpts.setUserName(bean.getUserName()); connOpts.setPassword(bean.getPassword().toCharArray()); mqttClient.connect(connOpts); //把连接对象加入到全局 mqttClients.put(bean.getTopics(), mqttClient); mqttClient.setCallback(new MqttCallbackExtended() { @Override public void connectComplete(boolean reconnect, String serverURI) { if (reconnect) { System.out.println("Reconnected successfully. url = " + serverURI); } else { System.out.println("Connected successfully for the first time."); } } /** * 设置重连机制 */ @Override public void connectionLost(Throwable cause) { System.err.println(bean.getTopics() + "连接丢失" + cause.getMessage()); if (!mqttClient.isConnected()) { try { Thread.sleep(1000 * 60 * 5); //尝试连接 System.out.println("bean = " + bean); boolean flag = MyMqttUtils.connectMqtt(new MqttBean(bean.getUrl(), bean.getUserName(), bean.getPassword(), bean.getTopics(), bean.getClientId())); if (flag) { System.err.println(bean.getTopics() + "重新连接,重新订阅!"); } } catch (InterruptedException e) { MyMqttUtils.saveToTxt(bean.getTopics() + "MQTT连接出异常了" + e.getMessage(), "CLCW"); throw new RuntimeException(e); } } } @Override public void messageArrived(String topic, MqttMessage message) { //这里是当消息推送时我们做的事情 System.out.println("dosomethings..."); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { System.out.println("消息发送完整"); } }); mqttClient.subscribe(bean.getTopics(), 2); } catch (Exception e) { e.printStackTrace(); System.err.println(bean.getTopics() + "MQTT连接出异常了"); MyMqttUtils.saveToTxt(bean.getTopics() + "MQTT连接出异常了"+e.getMessage(),"LJCW"); try { Thread.sleep(1000 * 60 * 30); //尝试连接 System.out.println("bean = " + bean); boolean flag = MyMqttUtils.connectMqtt(new MqttBean(bean.getUrl(), bean.getUserName(), bean.getPassword(), bean.getTopics(), bean.getClientId())); if (flag) { System.err.println(bean.getTopics() + "重新连接,重新订阅!"); } } catch (InterruptedException ex) { //可以在这里把报错信息存入本地看看 throw new RuntimeException(ex); } } }); } }
import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /** * @author 1097022316 */ @Data @AllArgsConstructor @NoArgsConstructor public class MqttBean { private String url; private String userName; private String password; private String topics; private String clientId; }
重点是MqttClient中几个参数和配置参数,以及那几个重写的方法,看下源码就好了。这里用的比较粗糙,只是简单的实现了连接和重连,一些复杂的如心跳或者遗嘱啥的都没用,要研究可自行查看