Spring Boot+RocketMQ 实现多实例分布式环境下的事件驱动
作者:mmseoamin日期:2024-02-04

Spring Boot+RocketMQ 实现多实例分布式环境下的事件驱动,第1张

目录

1为什么要使用MQ?

2整合RocketMQ

依赖版本

引入RocketMQ依赖

解决Spring Boot3+不兼容 spring.factories

参考配置文件

3RocketMQ 使用

解决Spring Boot3+不支持spring.factories的问题

RocketMQ操作工具

定义RocketMQ消费者

定义测试类发送消息

4测试


1为什么要使用MQ?

在 Spring Boot Event这篇文章中 已经通过Guava或者SpringBoot自身的Listener实现了事件驱动,已经做到了对业务的解耦。为什么还要用到MQ来进行业务解耦呢?

首先无论是通过Guava还是Spring Boot自身提供的监听注解来实现的事件驱动他都是处于同一进程中的,意思就是当前事件推送后只有当前的进程可以进行消费。

通过MQ可以实现将事件推送到进程外的Broker中,在多实例/分布式环境下,其他的服务在订阅同一事件(Topic)时,可以在各自的服务中进行消费,最大化空闲服务的利用。

Spring Boot+RocketMQ 实现多实例分布式环境下的事件驱动,第2张

源码地址:

https://gitee.com/sparkle3021/springboot3-study

2整合RocketMQ

依赖版本
  • JDK 17

  • Spring Boot 3.2.0

  • RocketMQ-Client 5.0.4

  • RocketMQ-Starter 2.2.0

    Spring Boot 3.0+ 取消了对spring.factories的支持。所以在导入时需要手动引入RocketMQ的配置类。

    引入RocketMQ依赖
    
        org.apache.rocketmq
        rocketmq-client-java
        5.0.4
    
    
        org.apache.rocketmq
        rocketmq-spring-boot-starter
        2.2.0
    
    
    解决Spring Boot3+不兼容 spring.factories

    rocketmq-spring-boot-starter:2.2.2版本中:

    Spring Boot+RocketMQ 实现多实例分布式环境下的事件驱动,第3张

    参考配置文件
    # RocketMQ 配置
    rocketmq:
      name-server: 127.0.0.1:9876
      consumer:
        group: event-mq-group
        # 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值
        pull-batch-size: 1
      producer:
        # 发送同一类消息的设置为同一个group,保证唯一
        group: event-mq-group
        # 发送消息超时时间,默认3000
        sendMessageTimeout: 10000
        # 发送消息失败重试次数,默认2
        retryTimesWhenSendFailed: 2
        # 异步消息重试此处,默认2
        retryTimesWhenSendAsyncFailed: 2
        # 消息最大长度,默认1024 * 1024 * 4(默认4M)
        maxMessageSize: 4096
        # 压缩消息阈值,默认4k(1024 * 4)
        compressMessageBodyThreshold: 4096
        # 是否在内部发送失败时重试另一个broker,默认false
        retryNextServer: false
    
    • 方法一 :通过 @Import(RocketMQAutoConfiguration.class) 在配置类中引入
    • 方法二:在resources资源目录下创建文件夹及文件 META-INF/spring,org.springframework.boot.autoconfigure.AutoConfiguration.imports。

      文件内容为RocketMQ自动配置类路径: org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration

      3RocketMQ 使用

      解决Spring Boot3+不支持spring.factories的问题
      import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
      import org.springframework.boot.SpringApplication;
      import org.springframework.boot.autoconfigure.SpringBootApplication;
      import org.springframework.context.annotation.Import;
      /**
       * 启动类
       */
      @Import(RocketMQAutoConfiguration.class)
      @SpringBootApplication
      public class MQEventApplication {
          public static void main(String[] args) {
              SpringApplication.run(MQEventApplication.class, args);
          }
      }
      
      RocketMQ操作工具

      RocketMQ Message实体

      import cn.hutool.core.util.IdUtil;
      import jakarta.validation.constraints.NotBlank;
      import lombok.AllArgsConstructor;
      import lombok.Builder;
      import lombok.Data;
      import lombok.NoArgsConstructor;
      import org.apache.commons.collections.CollectionUtils;
      import org.apache.commons.lang3.ObjectUtils;
      import org.springframework.messaging.Message;
      import org.springframework.messaging.support.MessageBuilder;
      import java.io.Serializable;
      import java.util.List;
      /**
       * RocketMQ 消息
       */
      @Data
      @Builder
      @AllArgsConstructor
      @NoArgsConstructor
      public class RocketMQMessage implements Serializable {
          /**
           * 消息队列主题
           */
          @NotBlank(message = "MQ Topic 不能为空")
          private String topic;
          /**
           * 延迟级别
           */
          @Builder.Default
          private DelayLevel delayLevel = DelayLevel.OFF;
          /**
           * 消息体
           */
          private T message;
          /**
           * 消息体
           */
          private List messages;
          /**
           * 使用有序消息发送时,指定发送到队列
           */
          private String hashKey;
          /**
           * 任务Id,用于日志打印相关信息
           */
          @Builder.Default
          private String taskId = IdUtil.fastSimpleUUID();
      }
      

      RocketMQTemplate 二次封装

      import com.yiyan.study.domain.RocketMQMessage;
      import jakarta.annotation.Resource;
      import lombok.extern.slf4j.Slf4j;
      import org.apache.commons.lang3.StringUtils;
      import org.apache.rocketmq.client.producer.SendCallback;
      import org.apache.rocketmq.client.producer.SendResult;
      import org.apache.rocketmq.spring.core.RocketMQTemplate;
      import org.springframework.beans.factory.annotation.Value;
      import org.springframework.stereotype.Component;
      /**
       * RocketMQ 消息工具类
       */
      @Slf4j
      @Component
      public class RocketMQService {
          @Resource
          private RocketMQTemplate rocketMQTemplate;
          @Value("${rocketmq.producer.sendMessageTimeout}")
          private int sendMessageTimeout;
          /**
           * 异步发送消息回调
           *
           * @param taskId 任务Id
           * @param topic  消息主题
           * @return the send callback
           */
          private static SendCallback asyncSendCallback(String taskId, String topic) {
              return new SendCallback() {
                  @Override
                  public void onSuccess(SendResult sendResult) {
                      log.info("ROCKETMQ 异步消息发送成功 : [TaskId:{}] - [Topic:{}] - [SendStatus:{}]", taskId, topic, sendResult.getSendStatus());
                  }
                  @Override
                  public void onException(Throwable throwable) {
                      log.error("ROCKETMQ 异步消息发送失败 : [TaskId:{}] - [Topic:{}] - [ErrorMessage:{}]", taskId, topic, throwable.getMessage());
                  }
              };
          }
          /**
           * 发送同步消息,使用有序发送请设置HashKey
           *
           * @param message 消息参数
           */
          public  void syncSend(RocketMQMessage message) {
              log.info("ROCKETMQ 同步消息发送 : [TaskId:{}] - [Topic:{}]", message.getTaskId(), message.getTopic());
              SendResult sendResult;
              if (StringUtils.isNotBlank(message.getHashKey())) {
                  sendResult = rocketMQTemplate.syncSendOrderly(message.getTopic(), message.getMessage(), message.getHashKey());
              } else {
                  sendResult = rocketMQTemplate.syncSend(message.getTopic(), message.getMessage(), sendMessageTimeout, message.getDelayLevel().getLevel());
              }
              log.info("ROCKETMQ 同步消息发送结果 : [TaskId:{}] - [Topic:{}] - [MessageId:{}] - [SendStatus:{}]",
                      message.getTaskId(), message.getTopic(), sendResult.getMsgId(), sendResult.getSendStatus());
          }
          /**
           * 批量发送同步消息
           *
           * @param message 消息参数
           */
          public  void syncSendBatch(RocketMQMessage message) {
              log.info("ROCKETMQ 同步消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount:{}]",
                      message.getTaskId(), message.getTopic(), message.getMessages().size());
              SendResult sendResult;
              if (StringUtils.isNotBlank(message.getHashKey())) {
                  sendResult = rocketMQTemplate.syncSendOrderly(message.getTopic(), message.getMessages(), message.getHashKey());
              } else {
                  sendResult = rocketMQTemplate.syncSend(message.getTopic(), message.getMessages());
              }
              log.info("ROCKETMQ 同步消息-批量发送结果 : [TaskId:{}] - [Topic:{}] - [MessageId:{}] - [SendStatus:{}]",
                      message.getTaskId(), message.getTopic(), sendResult.getMsgId(), sendResult.getSendStatus());
          }
          /**
           * 异步发送消息,异步返回消息结果
           *
           * @param message 消息参数
           */
          public  void asyncSend(RocketMQMessage message) {
              log.info("ROCKETMQ 异步消息发送 : [TaskId:{}] - [Topic:{}]", message.getTaskId(), message.getTopic());
              if (StringUtils.isNotBlank(message.getHashKey())) {
                  rocketMQTemplate.asyncSendOrderly(message.getTopic(), message.getMessage(), message.getHashKey(),
                          asyncSendCallback(message.getTaskId(), message.getTopic()));
              } else {
                  rocketMQTemplate.asyncSend(message.getTopic(), message.getMessage(),
                          asyncSendCallback(message.getTaskId(), message.getTopic()), sendMessageTimeout, message.getDelayLevel().getLevel());
              }
          }
          /**
           * 批量异步发送消息
           *
           * @param message 消息参数
           */
          public  void asyncSendBatch(RocketMQMessage message) {
              log.info("ROCKETMQ 异步消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount:{}]",
                      message.getTaskId(), message.getTopic(), message.getMessages().size());
              if (StringUtils.isNotBlank(message.getHashKey())) {
                  rocketMQTemplate.asyncSendOrderly(message.getTopic(), message.getMessages(), message.getHashKey(),
                          asyncSendCallback(message.getTaskId(), message.getTopic()));
              } else {
                  rocketMQTemplate.asyncSend(message.getTopic(), message.getMessages(),
                          asyncSendCallback(message.getTaskId(), message.getTopic()));
              }
          }
          /**
           * 单向发送消息,不关心返回结果,容易消息丢失,适合日志收集、不精确统计等消息发送;
           *
           * @param message 消息参数
           */
          public  void sendOneWay(RocketMQMessage message) {
              sendOneWay(message, false);
          }
          /**
           * 单向消息 - 批量发送
           *
           * @param message 消息体
           * @param batch   是否为批量操作
           */
          public  void sendOneWay(RocketMQMessage message, boolean batch) {
              log.info((batch ? "ROCKETMQ 单向消息发送 : [TaskId:{}] - [Topic:{}]"
                              : "ROCKETMQ 单向消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount{}]"),
                      message.getTaskId(), message.getTopic(), message.getMessages().size());
              if (StringUtils.isNotBlank(message.getHashKey())) {
                  if (batch) {
                      message.getMessages().
                              forEach(msg -> rocketMQTemplate.sendOneWayOrderly(message.getTopic(), msg, message.getHashKey()));
                  } else {
                      rocketMQTemplate.sendOneWayOrderly(message.getTopic(), message.getMessage(), message.getHashKey());
                  }
              } else {
                  if (batch) {
                      message.getMessages().forEach(msg -> rocketMQTemplate.sendOneWay(message.getTopic(), msg));
                  } else {
                      rocketMQTemplate.sendOneWay(message.getTopic(), message.getMessage());
                  }
              }
          }
      }
      
      定义RocketMQ消费者
      import com.yiyan.study.constants.MQConfig;
      import lombok.extern.slf4j.Slf4j;
      import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
      import org.apache.rocketmq.spring.core.RocketMQListener;
      import org.springframework.stereotype.Component;
      /**
       * MQ消息监听
       */
      @Component
      @Slf4j
      @RocketMQMessageListener(topic = MQConfig.EVENT_TOPIC,
              consumerGroup = MQConfig.EVENT_CONSUMER_GROUP)
      public class MQListener implements RocketMQListener {
          @Override
          public void onMessage(String message) {
              log.info("MQListener 接收消息 : {}", message);
          }
      }
      
      定义测试类发送消息
      import cn.hutool.core.thread.ThreadUtil;
      import com.yiyan.study.constants.MQConfig;
      import com.yiyan.study.domain.RocketMQMessage;
      import com.yiyan.study.utils.RocketMQService;
      import jakarta.annotation.Resource;
      import org.junit.jupiter.api.Test;
      import org.springframework.boot.test.context.SpringBootTest;
      /**
       * MQ测试
       */
      @SpringBootTest
      public class MQTest {
          @Resource
          private RocketMQService rocketMQService;
          @Test
          public void sendMessage() {
              int count = 1;
              while (count <= 50) {
                  rocketMQService.syncSend(RocketMQMessage.builder()
                          .topic(MQConfig.EVENT_TOPIC)
                          .message(count++)
                          .build());
              }
              // 休眠等待消费消息
              ThreadUtil.sleep(2000L);
          }
      }
      

      4测试

      Spring Boot+RocketMQ 实现多实例分布式环境下的事件驱动,第4张

      总结 

      感谢您的阅读~

      Spring Boot+RocketMQ 实现多实例分布式环境下的事件驱动,第5张