springboot集成mqtt
作者:mmseoamin日期:2024-04-01

文章目录

  • 前言
  • 一、MQTT是什么?
  • 二、继承步骤
    • 1.安装MQTT
    • 2.创建项目,引入依赖
    • 3. 对应步骤2的代码
    • 3 测试
    • 总结
      • mqtt 启动后访问地址

        前言

        随着物联网的火热,MQTT的应用逐渐增多

        曾经也有幸使用过mqtt,今天正好总结下MQTT的使用;


        一、MQTT是什么?

        可以把他理解为,也是一种mq消息,设计简单且轻量级,通讯报文开销小,占用的网络带宽和资源较少,适用于低带宽、不稳定网络环境下的通讯。

        MQTT采用发布/订阅模式,分为发布者和订阅者两个角色,需要一个中介来协调发布者和订阅者之间的消息传递,这个中介就是MQTT代理(Broker)。

        MQTT协议在物联网领域应用广泛,包括智能家居、工业自动化、智能交通系统等。


        个人简单总结:

        1. 每个客户端可以订阅一个或者多个主题(发消息,收消息)
        2. 每个客户端不订阅主题,也可以发送主题消息(只接受消息,不发送消息)
        3. 客户端A发送消息给客户端B流程为:
        客户端A>>>Broker>>>客户端B
        --- 
        前置条件:
        a: 客户端A 发送主题消息,且与客户端B的订阅主题一致
        b: 客户端B 订阅主题
        

        二、继承步骤

        1.安装MQTT

        这里直接采用windows版本,解压版,比较快

        • 下载地址 MQTT-windows版本
        • 解压后,在bin文件下执行运行命令 .\emqx console
        • 访问MQTT管理页面 http://localhost:18083/#/ 用户名密码admin/public

          springboot集成mqtt,MQTT管理页面,第1张

          2.创建项目,引入依赖

          大致分为如下步骤:

          • yml配置 主题 用户名 密码
          • 根据配置创建客户端实例,实例订阅主题
          • 实现 MqttCallback 接口
            1. 重连处理 connectionLost
            2. 消息接受处理 messageArrived
            3. 消息发生成功处理 deliveryComplete
            
          • 根据客户端信息发送某个主题的消息

            3. 对应步骤2的代码

            1. yml配置
            server:
              port: 8081
            # 下面这里要看你自己的需求
            customer:
              mqtt:
                broker: tcp://127.0.0.1:1883
                clientList:
                  #发布客户端ID
                  - clientId: nxys_service
                    #监听主题 同时订阅多个主题使用 - 分割开
                    subscribeTopic: mqtt/publish
                    #用户名
                    userName: admin
                    #密码
                    password: public
                  #接受客户端ID
                  - clientId: receive_service
                    #监听主题 同时订阅多个主题使用 - 分割开
                    subscribeTopic: mqtt/receive
                    #用户名
                    userName: admin
                    #密码
                    password: public
            
            1. 实例信息获取
            /**
             * Mqtt配置类
             */
            @Data
            @Configuration
            @ConfigurationProperties(prefix = "customer.mqtt")
            public class MqttConfig {
                /**
                 * mqtt broker地址
                 */
                String broker;
                /**
                 * 需要创建的MQTT客户端
                 */
                List clientList;
            }
            
            /**
             * MQTT客户端
             */
            @Data
            public class MqttClient {
                /**
                 * 客户端ID
                 */
                private String clientId;
                /**
                 * 监听主题
                 */
                private String subscribeTopic;
                /**
                 * 用户名
                 */
                private String userName;
                /**
                 * 密码
                 */
                private String password;
            }
            
            1. 根据信息创建实例,订阅主题
            /**
             * MQTT客户端创建
             */
            @Component
            @Slf4j
            public class MqttClientCreate {
                @Resource
                private MqttClientManager mqttClientManager;
                @Autowired
                private MqttConfig mqttConfig;
                /**
                 * 创建MQTT客户端
                 */
                @PostConstruct
                public void createMqttClient() {
                    List mqttClientList = mqttConfig.getClientList();
                    for (MqttClient mqttClient : mqttClientList) {
                        log.info("{}", mqttClient);
                        //创建客户端,客户端ID:demo,回调类跟客户端ID一致
                        mqttClientManager.createMqttClient(mqttClient.getClientId(), mqttClient.getSubscribeTopic(), mqttClient.getUserName(), mqttClient.getPassword());
                    }
                }
            }
            
            /**
             * MQTT客户端管理类,如果客户端非常多后续可入redis缓存
             */
            @Slf4j
            @Component
            public class MqttClientManager {
                @Value("${customer.mqtt.broker}")
                private String mqttBroker;
                @Resource
                private MqttCallBackContext mqttCallBackContext;
                /**
                 * 存储MQTT客户端
                 */
                public static Map MQTT_CLIENT_MAP = new ConcurrentHashMap<>();
                public static MqttClient getMqttClientById(String clientId) {
                    return MQTT_CLIENT_MAP.get(clientId);
                }
                /**
                 * 创建mqtt客户端
                 *
                 * @param clientId       客户端ID
                 * @param subscribeTopic 订阅主题,可为空
                 * @param userName       用户名,可为空
                 * @param password       密码,可为空
                 * @return mqtt客户端
                 */
                public void createMqttClient(String clientId, String subscribeTopic, String userName, String password) {
                    MemoryPersistence persistence = new MemoryPersistence();
                    try {
                        MqttClient client = new MqttClient(mqttBroker, clientId, persistence);
                        MqttConnectOptions connOpts = new MqttConnectOptions();
                        if (null != userName && !"".equals(userName)) {
                            connOpts.setUserName(userName);
                        }
                        if (null != password && !"".equals(password)) {
                            connOpts.setPassword(password.toCharArray());
                        }
                        connOpts.setCleanSession(true);
                        if (null != subscribeTopic && !"".equals(subscribeTopic)) {
                            AbsMqttCallBack callBack = mqttCallBackContext.getCallBack(clientId);
                            if (null == callBack) {
                                callBack = mqttCallBackContext.getCallBack("default");
                            }
                            callBack.setClientId(clientId);
                            callBack.setConnectOptions(connOpts);
                            client.setCallback(callBack);
                        }
                        //连接mqtt服务端broker
                        client.connect(connOpts);
                        // 订阅主题
                        if (null != subscribeTopic && !"".equals(subscribeTopic)) {
                            if (subscribeTopic.contains("-"))
                                client.subscribe(subscribeTopic.split("-"));
                            else
            //                    if (!subscribeTopic.equals("mqtt/receive"))
                            {
                                client.subscribe(subscribeTopic);
                            }
                        }
                        MQTT_CLIENT_MAP.putIfAbsent(clientId, client);
                    } catch (MqttException e) {
                        log.error("Create mqttClient failed!", e);
                    }
                }
            }
            
            1. 实现 MqttCallback 接口
            /**
             * MQTT回调抽象类
             */
            @Slf4j
            public abstract class AbsMqttCallBack implements MqttCallback {
                private String clientId;
                private MqttConnectOptions connectOptions;
             
                public String getClientId() {
                    return clientId;
                }
             
                public void setClientId(String clientId) {
                    this.clientId = clientId;
                }
             
                public MqttConnectOptions getConnectOptions() {
                    return connectOptions;
                }
             
                public void setConnectOptions(MqttConnectOptions connectOptions) {
                    this.connectOptions = connectOptions;
                }
             
                /**
                 * 失去连接操作,进行重连
                 *
                 * @param throwable 异常
                 */
                @Override
                public void connectionLost(Throwable throwable) {
                    try {
                        if (null != clientId) {
                            if (null != dconnectOptions) {
                                MqttClientManager.getMqttClientById(clientId).connect(connectOptions);
                            } else {
                                MqttClientManager.getMqttClientById(clientId).connect();
                            }
                        }
             
                    } catch (Exception e) {
                        log.error("{} reconnect failed!", e);
                    }
                }
             
                /**
                 * 接收订阅消息
                 * @param topic    主题
                 * @param mqttMessage 接收消息
                 * @throws Exception 异常
                 */
                @Override
                public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
            		String content = new String(mqttMessage.getPayload());
                 	handleReceiveMessage(topic, content);
                }
             
                /**
                 * 消息发送成功
                 *
                 * @param iMqttDeliveryToken toke
                 */
                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    log.info("消息发送成功");
                }
             
             
                /**
                 * 处理接收的消息
                 * @param topic   主题
                 * @param message 消息内容
                 */
                protected abstract void handleReceiveMessage(String topic, String message);
            }
            
            /**
             * 默认回调
             */
            @Slf4j
            @Component("default")
            public class DefaultMqttCallBack extends AbsMqttCallBack {
                /**
                 * @param topic   主题
                 * @param message 消息内容
                 */
                @Override
                protected void handleReceiveMessage(String topic, String message) {
                    log.info("接收到主题---{}", topic);
                    log.info("接收到消息---{}", message);
                    // 你自己的消息处理业务
                }
            }
            
            /**
             * MQTT订阅回调环境类
             */
            @Component
            @Slf4j
            public class MqttCallBackContext {
                private final Map callBackMap = new ConcurrentHashMap<>();
                /**
                 * 默认构造函数
                 *
                 * @param callBackMap 回调集合
                 */
                public MqttCallBackContext(Map callBackMap) {
                    this.callBackMap.clear();
                    this.callBackMap.putAll(callBackMap);
                }
                /**
                 * 获取MQTT回调类
                 *
                 * @param clientId 客户端ID
                 * @return MQTT回调类
                 */
                public AbsMqttCallBack getCallBack(String clientId) {
                    return this.callBackMap.get(clientId);
                }
            }
            
            1. 发送消息
            @RestController
            public class SendController {
                @Resource
                MqttClientManager mqttClientManager;
                @RequestMapping("/sendMessage")
                public String sendMessage(String topic){
                    try {
                        MqttMessage mqttMessage = new MqttMessage("你好".getBytes());
                        mqttClientManager.getMqttClientById("nxys_service").publish(topic,mqttMessage);
                        return "发送成功";
                    } catch (Exception e) {
                        e.printStackTrace();
                        return "发送失败";
                    }
                }
            }
            

            3 测试

            1. 启动订阅,查看MQTT 管理页面

              springboot集成mqtt,两个实例,第2张

            2. 测试发送消息,查看发送情况,接受情况

              http://localhost:8081/sendMessage?topic=mqtt/receive

              springboot集成mqtt,发送成功,并接受到消息,第3张


            总结

            文中涉及的所有代码: MQTT-Demo

            mqtt 启动后访问地址

            http://localhost:18083/#/