生产者端:
@RestController @RequiredArgsConstructor public class TestController { private final MqttProducer mqttProducer; @GetMapping("/test") public String test() { User build = User.builder().age(100).sex(1).address("世界潍坊渤海之眼").build(); // 延时发布 mqttProducer.send("$delayed/10/cookie", 2, JSON.toJSONString(build)); return "ok"; } }
消费者端
/** * @author : Cookie * date : 2024/1/30 */ @Component @Topic("cookie") public class TestTopicHandler implements MsgHandler { @Override public void process(String jsonMsg) { User user = JSON.parseObject(jsonMsg, User.class); System.out.println(user); } }
控制台结果:
具体解释在之前的笔记中, 需要的话可以查看 EMQ的介绍及整合SpringBoot的使用-CSDN博客
OK, 下面我们就开始实现上面的逻辑, 你要做的就是把 1-9 复制到项目即可
org.eclipse.paho org.eclipse.paho.client.mqttv3 1.2.5
# 顶格 mqtt: client: username: admin password: public serverURI: tcp://192.168.200.128:1883 clientId: monitor.task.${random.int[10000,99999]} # 注意: emq的客户端id 不能重复 keepAliveInterval: 10 #连接保持检查周期 秒 connectionTimeout: 30 #连接超时时间 秒 producer: defaultQos: 2 defaultRetained: false defaultTopic: topic/test1 consumer: consumerTopics: $queue/cookie/#, $share/group1/yfs1024 #不带群组的共享订阅 多个主题逗号隔开 # $queue/cookie/# # 以$queue开头,不带群组的共享订阅 多个客户端只能有一个消费者消费 # $share/group1/yfs1024 # 以$share开头,群组的共享订阅 多个客户端订阅 # 如果在一个组 只能有一个消费者消费 # 如果不在一个组 都可以消费
@Data @Configuration @ConfigurationProperties(prefix = "mqtt.client") public class MqttConfigProperties { private int defaultProducerQos; private boolean defaultRetained; private String defaultTopic; private String username; private String password; private String serverURI; private String clientId; private int keepAliveInterval; private int connectionTimeout; }
@Documented @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.TYPE}) public @interface Topic { String value(); }
@Data @Slf4j @Configuration @RequiredArgsConstructor public class MqttConfig { private final MqttConfigProperties configProperties; private final MqttCallback mqttCallback; @Bean public MqttClient mqttClient() { try { MqttClient client = new MqttClient(configProperties.getServerURI(), configProperties.getClientId(), mqttClientPersistence()); client.setManualAcks(true); //设置手动消息接收确认 mqttCallback.setMqttClient(client); client.setCallback(mqttCallback); client.connect(mqttConnectOptions()); return client; } catch (MqttException e) { log.error("emq connect error", e); return null; } } @Bean public MqttConnectOptions mqttConnectOptions() { MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(configProperties.getUsername()); options.setPassword(configProperties.getPassword().toCharArray()); options.setAutomaticReconnect(true);//是否自动重新连接 options.setCleanSession(true);//是否清除之前的连接信息 options.setConnectionTimeout(configProperties.getConnectionTimeout());//连接超时时间 options.setKeepAliveInterval(configProperties.getKeepAliveInterval());//心跳 options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);//设置mqtt版本 return options; } public MqttClientPersistence mqttClientPersistence() { return new MemoryPersistence(); } }
/** * 消息处理接口 */ public interface MsgHandler { void process(String jsonMsg) throws IOException; }
/** * 消息处理上下文, 通过主题拿到topic */ public interface MsgHandlerContext{ MsgHandler getMsgHandler(String topic); }
@Component @Slf4j public class MqttCallback implements MqttCallbackExtended { // 需要订阅的topic配置 @Value("${mqtt.consumer.consumerTopics}") private ListconsumerTopics; @Autowired private MsgHandlerContext msgHandlerContext; @Override public void connectionLost(Throwable throwable) { log.error("emq error.", throwable); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { log.info("topic:" + topic + " message:" + new String(message.getPayload())); //处理消息 String msgContent = new String(message.getPayload()); log.info("接收到消息:" + msgContent); try { // 根据主题名称 获取 该主题对应的处理器对象 // 多态 父类的引用指向子类的对象 MsgHandler msgHandler = msgHandlerContext.getMsgHandler(topic); if (msgHandler == null) { return; } msgHandler.process(msgContent); //执行 } catch (IOException e) { log.error("process msg error,msg is: " + msgContent, e); } //处理成功后确认消息 mqttClient.messageArrivedComplete(message.getId(), message.getQos()); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { log.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete()); } @Override public void connectComplete(boolean b, String s) { log.info("连接成功"); //和EMQ连接成功后根据配置自动订阅topic if (consumerTopics != null && consumerTopics.size() > 0) { // 循环遍历当前项目中配置的所有的主题. consumerTopics.forEach(t -> { try { log.info(">>>>>>>>>>>>>>subscribe topic:" + t); // 订阅当前集群中所有的主题 消息服务质量 2 -> 至少收到一个 mqttClient.subscribe(t, 2); } catch (MqttException e) { log.error("emq connect error", e); } }); } } private MqttClient mqttClient; // 在配置类中调用传入连接 public void setMqttClient(MqttClient mqttClient) { this.mqttClient = mqttClient; } }
作用: 将Topic跟处理类对应 通过 handlerMap
/** * 消息处理类加载器 * 作用: * 1. 因为实现了Spring 的 ApplicationContextAware 接口, 项目启动后就会运行实现的方法 * 2. 获取MsgHandler接口的所有的实现类 * 3. 将实现类上的Topic注解的值,作为handlerMap的键,实现类(处理器)作为对应的值 */ @Component public class MsgHandlerContextImp implements ApplicationContextAware, MsgHandlerContext { private final MaphandlerMap = new HashMap<>(); @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { // 从spring容器中获取 <所有> 实现了MsgHandler接口的对象 // key 默认类名首字母小写 value 当前对象 Map map = applicationContext.getBeansOfType(MsgHandler.class); map.values().forEach(obj -> { // 通过反射拿到注解中的值 即 当前类处理的 topic String topic = obj.getClass().getAnnotation(Topic.class).value(); // 将主题和当前主题的处理类建立映射 handlerMap.put(topic,obj); }); } @Override public MsgHandler getMsgHandler(String topic) { return handlerMap.get(topic); } }
@Slf4j @Component public class MqttProducer { // @Value() 读取配置 当然也可以批量读取配置,这里就一个一个了 @Value("${mqtt.producer.defaultQos}") private int defaultProducerQos; @Value("${mqtt.producer.defaultRetained}") private boolean defaultRetained; @Value("${mqtt.producer.defaultTopic}") private String defaultTopic; @Autowired private MqttClient mqttClient; public void send(String payload) { this.send(defaultTopic, payload); } public void send(String topic, String payload) { this.send(topic, defaultProducerQos, payload); } public void send(String topic, int qos, String payload) { this.send(topic, qos, defaultRetained, payload); } public void send(String topic, int qos, boolean retained, String payload) { try { mqttClient.publish(topic, payload.getBytes(), qos, retained); } catch (MqttException e) { log.error("publish msg error.",e); } } publicvoid send(String topic, int qos, T msg) throws JsonProcessingException { String payload = JsonUtil.serialize(msg); this.send(topic,qos,payload); } }
@RestController @RequiredArgsConstructor public class TestController { private final MqttProducer mqttProducer; @GetMapping("/test") public String test() { User build = User.builder().age(100).sex(1).address("世界潍坊渤海之眼").build(); // 延时发布 mqttProducer.send("$delayed/10/cookie", 2, JSON.toJSONString(build)); return "ok"; } }
@Component @Topic("cookie") public class TestTopicHandler implements MsgHandler { @Override public void process(String jsonMsg) { User user = JSON.parseObject(jsonMsg, User.class); System.out.println(user); } }
控制台结果展示:
补充JsonUtil
public class JsonUtil { /** * 从json字符串中根据nodeName获取值 * @param nodeName * @param json * @return * @throws IOException */ public static String getValueByNodeName(String nodeName, String json) throws IOException { ObjectMapper objectMapper = new ObjectMapper(); JsonNode jsonNode = objectMapper.readTree(json); JsonNode node = jsonNode.findPath(nodeName); if(node == null) return null; return node.asText(); } /** * 根据nodeName获取节点内容 * @param nodeName * @param json * @return * @throws IOException */ public static JsonNode getNodeByName(String nodeName, String json) throws IOException { ObjectMapper objectMapper = new ObjectMapper(); return objectMapper.readTree(json).findPath(nodeName); } /** * 反序列化 * @param json * @param clazz * @param* @return * @throws IOException */ public static T getByJson(String json, Class clazz) throws IOException { ObjectMapper mapper = new ObjectMapper(); // 在反序列化时忽略在 json 中存在但 Java 对象不存在的属性 mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); // 在序列化时日期格式默认为 yyyy-MM-dd'T'HH:mm:ss.SSSZ mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); return mapper.readValue(json, clazz); } /** * 反序列化(驼峰转换) * @param json * @param clazz * @param * @return * @throws IOException */ public static T getByJsonSNAKE(String json, Class clazz) throws IOException { ObjectMapper mapper = new ObjectMapper(); // 在反序列化时忽略在 json 中存在但 Java 对象不存在的属性 mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); // 在序列化时日期格式默认为 yyyy-MM-dd'T'HH:mm:ss.SSSZ mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); // 设置驼峰和下划线之间的映射 mapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE); return mapper.readValue(json, clazz); } /** * 序列化 * @param object * @return * @throws JsonProcessingException */ public static String serialize(Object object) throws JsonProcessingException { ObjectMapper mapper = new ObjectMapper(); return mapper.writeValueAsString(object); } /** * 序列化(驼峰转换) * @param object * @return * @throws JsonProcessingException */ public static String serializeSNAKE(Object object) throws JsonProcessingException { ObjectMapper mapper = new ObjectMapper(); // 设置驼峰和下划线之间的映射 mapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE); return mapper.writeValueAsString(object); } public static JsonNode getTreeNode(String json) throws JsonProcessingException { ObjectMapper objectMapper = new ObjectMapper(); return objectMapper.readTree(json); } /** * 将对象转map * @param obj * @return * @throws IOException */ public static Map convertToMap(Object obj) throws IOException { ObjectMapper mapper = new ObjectMapper(); return mapper.readValue(serialize(obj),Map.class); } }