SpringBoot 集成 RocketMQ
作者:mmseoamin日期:2023-12-20

一、RocketMQ 的基本概念

1.消息模型(Message Model)

RocketMQ 主要由 Producer、Broker、Consumer 三部分组成,其中 Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个 Topic 的消息,每个Topic 的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个 Topic 中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个 Consumer 实例构成。

2.消息生产者(Producer)

负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到 broker 服务器。RocketMQ 提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要 Broker 返回确认信息,单向发送不需要。

3.消息消费者(Consumer)

负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从 Broker 服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。

4.主题(Topic)

表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ 进行消息订阅的基本单位。

5.代理服务器(Broker Server)

消息中转角色,负责存储消息、转发消息。代理服务器在 RocketMQ 系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

6.名字服务(Name Server)

名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP 列表。多个 Namesrv 实例组成集群,但相互独立,没有信息交换。

特性

  • 同步发送消息
  • 异步发送消息
  • 以单向模式发送消息
  • 发送有序消息
  • 发送批量消息
  • 发送交易消息
  • 发送具有延迟级别的预定消息
  • 以并发模式(广播/集群)消费消息
  • 消费有序消息
  • 使用标记或 sql92 表达式过滤消息
  • 支持消息追踪
  • 支持认证和授权
  • 支持请求-回复消息交换模式
  • 使用推/拉模式消费消息

    集成

    直接使用官方的 rocketmq-spring-boot-starter

    1.添加依赖

            
    
        4.0.0
        
            org.springframework.boot
            spring-boot-starter-parent
            2.6.5
             
        
        com.chaoyue
        rocketmq
        0.0.1-SNAPSHOT
        rocketmq
        Demo project for Spring Boot
        
            1.8
        
        
            
                org.springframework.boot
                spring-boot-starter-web
            
            
                org.apache.rocketmq
                rocketmq-spring-boot-starter
                2.1.1
            
            
                org.springframework.boot
                spring-boot-starter-test
                test
            
        
        
            
                
                    org.springframework.boot
                    spring-boot-maven-plugin
                
            
        
    
    

    2.1.1版本,内置的 rocketmq-client 版本为4.7.1应该与你 rocketmq 服务保持一致。

    2.添加配置

    application.yml 文件中添加如下配置:

    server:
      port: 8081
    rocketmq:
      # NameServer
      name-server: 192.168.1.38:9876
      # 默认的消息组
      producer:
        group: group1
    

    3.新建生产者类 RocketMQProducer

    注入 RocketMQTemplate

    package com.chaoyue.rocketmq.producer;
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    @Component
    public class RocketMQProducer {
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
        // 发送消息
        public void sendMessage(String topic, String msg) {
            rocketMQTemplate.convertAndSend(topic, msg);
        }
    }
    

    4.新建消费者类 RocketMQConsumer

    package com.chaoyue.rocketmq.consumer;
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.springframework.stereotype.Component;
    /**
     * 泛型可选
     */
    @Component
    @RocketMQMessageListener(consumerGroup = "group1", topic = "topic1")
    public class RocketMQConsumer implements RocketMQListener {
        @Override
        public void onMessage(String message) {
            System.out.println("Received message : " + message);
        }
    }
    

    5.新建控制类 RocketMQController

    package com.chaoyue.rocketmq.controller;
    import com.chaoyue.rocketmq.producer.RocketMQProducer;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    import javax.annotation.Resource;
    @RestController
    @RequestMapping("/RocketMQ")
    public class RocketMQController {
        private final String topic = "topic1";
        @Resource
        private RocketMQProducer producer;
        @RequestMapping("/sendMessage")
        public String sendMessage(String message) {
            producer.sendMessage(topic, message);
            return "消息已发送";
        }
    }
    

    6.启动服务并测试

    启动服务后,浏览器输入:http://localhost:8081/RocketMQ/sendMessage?message=hi,sendmessage:

    SpringBoot 集成 RocketMQ,在这里插入图片描述,第1张

    生产者发送消息

    同步发送消息

    可靠的同步传输应用于广泛的场景,如重要通知消息、短信通知、短信营销系统等。

    // 发送字符串
    rocketMQTemplate.syncSend("springTopic", "Hello, World!");
    // 同步发送
    rocketMQTemplate.convertAndSend("test-topic-1", "Hello, World!");
    // 发送对象
    rocketMQTemplate.syncSend("userTopic", new User().setUserAge((byte) 18).setUserName("Kitty"));
    // 发送spring 消息
    rocketMQTemplate.syncSend(userTopic, MessageBuilder.withPayload(
            new User().setUserAge((byte) 21).setUserName("Lester")).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON_VALUE).build());
    

    异步发送消息

    异步传输一般用于响应时间敏感的业务场景。

    rocketMQTemplate.asyncSend("orderPaidTopic", "异步发送", new SendCallback() {
        @Override
        public void onSuccess(SendResult var1) {
            // 成功回调
            System.out.printf("async onSucess SendResult=%s %n", var1);
        }
        @Override
        public void onException(Throwable var1) {
            // 失败回调
            System.out.printf("async onException Throwable=%s %n", var1);
        }
    });
    

    单向发送消息

    单向传输用于需要中等可靠的情况,例如日志收集

    rocketMQTemplate.sendOneway("springTopic", "Hello, World!");
    

    发送有序消息

    rocketMQTemplate.syncSendOrderly("orderly_topic",MessageBuilder.withPayload("Hello, World").build(),"hashkey")
    

    发送事务消息

    Message msg = MessageBuilder.withPayload("rocketMQTemplate transactional message ").build();
    // 第一个参数必须与@RocketMQTransactionListener的成员字段'transName'相同
    rocketMQTemplate.sendMessageInTransaction("test-topic", msg, null);
    // 使用注解@RocketMQTransactionListener定义事务监听器
    @RocketMQTransactionListener
    class TransactionListenerImpl implements RocketMQLocalTransactionListener {
          @Override
          public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                // ... local transaction process, return bollback, commit or unknown
                return RocketMQLocalTransactionState.UNKNOWN;
          }
          @Override
          public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
                // ... check transaction status and return bollback, commit or unknown
                return RocketMQLocalTransactionState.COMMIT;
          }
    }
    

    发送特殊标签(tag)消息

    rocketMQTemplate.convertAndSend(msgExtTopic + ":tag0", "I'm from tag0");  // tag0 不是消费者选择的,可以通过tag过滤掉
    rocketMQTemplate.convertAndSend(msgExtTopic + ":tag1", "I'm from tag1");
    

    发送批量消息

    List msgs = new ArrayList();
    for (int i = 0; i < 10; i++) {
        msgs.add(MessageBuilder.withPayload("Hello RocketMQ Batch Msg#" + i).
                setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build());
    }
    SendResult sr = rocketMQTemplate.syncSend(springTopic, msgs, 60000);
    

    Push 模式

    消费者

    @Slf4j
    @Service
    @RocketMQMessageListener(topic = "laker-123", consumerGroup = "laker_consumer_group")
    public class MyConsumer1 implements RocketMQListener {
        public void onMessage(String message) {
            log.info("received message: {}", message);
        }
    }
    @Slf4j
    @Service
    @RocketMQMessageListener(topic = "test-topic-2", consumerGroup = "my-consumer_test-topic-2")
    public class MyConsumer2 implements RocketMQListener {
        public void onMessage(OrderPaidEvent orderPaidEvent) {
            log.info("received orderPaidEvent: {}", orderPaidEvent);
        }
    }
    

    Pull 模式

    从RocketMQ Spring 2.2.0开始,RocketMQ Srping支持Pull模式消费

    修改application.properties

    ## application.properties
    rocketmq.name-server=127.0.0.1:9876
    rocketmq.consumer.group=my-group1
    rocketmq.consumer.topic=test
    

    编写代码

    @SpringBootApplication
    public class ConsumerApplication implements CommandLineRunner {
        @Resource
        private RocketMQTemplate rocketMQTemplate;
        @Override
        public void run(String... args) throws Exception {
            //This is an example of pull consumer using rocketMQTemplate.
            List messages = rocketMQTemplate.receive(String.class);
            System.out.printf("receive from rocketMQTemplate, messages=%s %n", messages);
        }
    }
    

    高级

    请求应答语义

    RocketMQ-Spring 提供 请求/应答 语义支持。

    producer 端

    发送Request消息使用SendAndReceive方法

    同步发送需要在方法的参数中指明返回值类型

    异步发送需要在回调的接口中指明返回值类型

    // 同步发送request并且等待String类型的返回值
    String replyString = rocketMQTemplate.sendAndReceive("stringRequestTopic", "request string", String.class);
    // 异步发送request并且等待User类型的返回值
    rocketMQTemplate.sendAndReceive("objectRequestTopic", new User("requestUserName",(byte) 9), new RocketMQLocalRequestCallback() {
                @Override public void onSuccess(User message) {
                    System.out.printf("send user object and receive %s %n", message.toString());
                }
                @Override public void onException(Throwable e) {
                    e.printStackTrace();
                }
            }, 5000);
    
    Consumer端

    需要实现RocketMQReplyListener 接口,其中T表示接收值的类型,R表示返回值的类型。

    @SpringBootApplication
    public class ConsumerApplication{
        
        public static void main(String[] args){
            SpringApplication.run(ConsumerApplication.class, args);
        }
        
        @Service
        @RocketMQMessageListener(topic = "stringRequestTopic", consumerGroup = "stringRequestConsumer")
        public class StringConsumerWithReplyString implements RocketMQReplyListener {
            @Override
            public String onMessage(String message) {
              System.out.printf("------- StringConsumerWithReplyString received: %s \n", message);
              return "reply string";
            }
          }
       
        @Service
        @RocketMQMessageListener(topic = "objectRequestTopic", consumerGroup = "objectRequestConsumer")
        public class ObjectConsumerWithReplyUser implements RocketMQReplyListener{
            public void onMessage(User user) {
              	System.out.printf("------- ObjectConsumerWithReplyUser received: %s \n", user);
              	User replyUser = new User("replyUserName",(byte) 10);	
              	return replyUser;
            }
        }
        @Data
        @AllArgsConstructor
        public class User implements Serializable{
            private String userName;
        		private Byte userAge;
        }
    }
    

    ACL功能

    Producer 端要想使用 ACL 功能,需要多配置两个配置项:

    ## application.properties
    rocketmq.name-server=127.0.0.1:9876
    rocketmq.producer.group=my-group
    rocketmq.producer.access-key=AK
    rocketmq.producer.secret-key=SK
    

    Consumer 端 ACL 功能需要在 @RocketMQMessageListener 中进行配置

    @Service
    @RocketMQMessageListener(
        topic = "test-topic-1", 
        consumerGroup = "my-consumer_test-topic-1",
        accessKey = "AK",
        secretKey = "SK"
    )
    public class MyConsumer implements RocketMQListener {
        ...
    }
    

    注意:

    可以不用为每个 @RocketMQMessageListener 注解配置 AK/SK,在配置文件中配置 rocketmq.consumer.access-key 和 rocketmq.consumer.secret-key 配置项,这两个配置项的值就是默认值

    消息轨迹

    Producer 端要想使用消息轨迹,需要多配置两个配置项:

    ## application.properties
    rocketmq.name-server=127.0.0.1:9876
    rocketmq.producer.group=my-group
    rocketmq.producer.enable-msg-trace=true
    rocketmq.producer.customized-trace-topic=my-trace-topic
    

    Consumer 端消息轨迹的功能需要在 @RocketMQMessageListener 中进行配置对应的属性:

    @Service
    @RocketMQMessageListener(
        topic = "test-topic-1", 
        consumerGroup = "my-consumer_test-topic-1",
        enableMsgTrace = true,
        customizedTraceTopic = "my-trace-topic"
    )
    public class MyConsumer implements RocketMQListener {
        ...
    }
    

    注意:

    默认情况下 Producer 和 Consumer 的消息轨迹功能是开启的且 trace-topic 为 RMQ_SYS_TRACE_TOPIC

    Consumer 端的消息轨迹 trace-topic 可以在配置文件中配置 rocketmq.consumer.customized-trace-topic 配置项,不需要为在每个 @RocketMQMessageListener 配置。

    阿里云消息轨迹正常显示需要设置accessChannel配置为CLOUD。

    常见问题

    1. 生产环境有多个nameserver该如何连接?

      rocketmq.name-server支持配置多个nameserver地址,采用;分隔即可。例如:172.19.0.1:9876;172.19.0.2:9876

    2. rocketMQTemplate 在什么时候被销毁?

      开发者在项目中使用rocketMQTemplate发送消息时,不需要手动执行rocketMQTemplate.destroy()方法, rocketMQTemplate会在spring容器销毁时自动销毁。

    3. 启动报错:Caused by: org.apache.rocketmq.client.exception.MQClientException: The consumer group[xxx] has been created before, specify another name please

      RocketMQ在设计时就不希望一个消费者同时处理多个类型的消息,因此同一个consumerGroup下的consumer职责应该是一样的,不要干不同的事情(即消费多个topic)。建议consumerGroup与topic一一对应。

    4. 发送的消息内容体是如何被序列化与反序列化的?

      RocketMQ的消息体都是以byte[]方式存储。当业务系统的消息内容体如果是java.lang.String类型时,统一按照utf-8编码转成byte[];如果业务系统的消息内容为非java.lang.String类型,则采用jackson-databind序列化成JSON格式的字符串之后,再统一按照utf-8编码转成byte[]。

    5. 如何指定topic 的 tags?

      RocketMQ的最佳实践中推荐:一个应用尽可能用一个Topic,消息子类型用tags来标识,tags可以由应用自由设置。 在使用rocketMQTemplate发送消息时,通过设置发送方法的destination参数来设置消息的目的地,destination的格式为topicName:tagName,:前面表示topic的名称,后面表示tags名称。

      注意:

      tags从命名来看像是一个复数,但发送消息时,目的地只能指定一个topic下的一个tag,不能指定多个。

    6. 发送消息时如何设置消息的key?

      可以通过重载的 xxxSend(String destination, Message msg, ...) 方法来发送消息,指定msg的headers来完成。示例:

      Message message = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, msgId).build();
      rocketMQTemplate.send("topic-test", message);
      

      同理还可以根据上面的方式来设置消息的FLAG、WAIT_STORE_MSG_OK以及一些用户自定义的其它头信息。

      注意:

      在将Spring的Message转化为RocketMQ的Message时,为防止header信息与RocketMQ的系统属性冲突,在所有header的名称前面都统一添加了前缀USERS_。因此在消费时如果想获取自定义的消息头信息,请遍历头信息中以USERS_开头的key即可。

    7. 消费消息时,除了获取消息payload外,还想获取RocketMQ消息的其它系统属性,需要怎么做?

      消费者在实现RocketMQListener接口时,只需要起泛型为MessageExt即可,这样在onMessage方法将接收到RocketMQ原生的MessageExt消息。

      @Slf4j
      @Service
      @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
      public class MyConsumer2 implements RocketMQListener{
          public void onMessage(MessageExt messageExt) {
              log.info("received messageExt: {}", messageExt);
          }
      }
      
    8. 如何指定消费者从哪开始消费消息,或开始消费的位置?

      消费者默认开始消费的位置请参考:RocketMQ FAQ。 若想自定义消费者开始的消费位置,只需在消费者类添加一个RocketMQPushConsumerLifecycleListener接口的实现即可。 示例如下:

      @Slf4j
      @Service
      @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
      public class MyConsumer1 implements RocketMQListener, RocketMQPushConsumerLifecycleListener {
          @Override
          public void onMessage(String message) {
              log.info("received message: {}", message);
          }
          @Override
          public void prepareStart(final DefaultMQPushConsumer consumer) {
              // set consumer consume message from now
              consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
              	  consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
          }
      }
      

      同理,任何关于DefaultMQPushConsumer的更多其它其它配置,都可以采用上述方式来完成。

    9. 如何发送事务消息?

      在客户端,首先用户需要实现RocketMQLocalTransactionListener接口,并在接口类上注解声明@RocketMQTransactionListener,实现确认和回查方法;然后再使用资源模板RocketMQTemplate, 调用方法sendMessageInTransaction()来进行消息的发布。

      注意:从RocketMQ-Spring 2.1.0版本之后,注解@RocketMQTransactionListener不能设置txProducerGroup、ak、sk,这些值均与对应的RocketMQTemplate保持一致。

    10. 如何声明不同name-server或者其他特定的属性来定义非标的RocketMQTemplate?

      第一步: 定义非标的RocketMQTemplate使用你需要的属性,可以定义与标准的RocketMQTemplate不同的nameserver、groupname等。如果不定义,它们取全局的配置属性值或默认值。

      // 这个RocketMQTemplate的Spring Bean名是'extRocketMQTemplate', 与所定义的类名相同(但首字母小写)
      @ExtRocketMQTemplateConfiguration(nameServer="127.0.0.1:9876"
         , ... // 定义其他属性,如果有必要。
      )
      public class ExtRocketMQTemplate extends RocketMQTemplate {
        //类里面不需要做任何修改
      }
      

      第二步: 使用这个非标RocketMQTemplate

      @Resource(name = "extRocketMQTemplate") // 这里必须定义name属性来指向上述具体的Spring Bean.
      private RocketMQTemplate extRocketMQTemplate; 
      

      接下来就可以正常使用这个extRocketMQTemplate了。

    11. 如何使用非标的RocketMQTemplate发送事务消息?

      首先用户需要实现RocketMQLocalTransactionListener接口,并在接口类上注解声明@RocketMQTransactionListener,注解字段的rocketMQTemplateBeanName指明为非标的RocketMQTemplate的Bean name(若不设置则默认为标准的RocketMQTemplate),比如非标的RocketMQTemplate Bean name为“extRocketMQTemplate",则代码如下:

      @RocketMQTransactionListener(rocketMQTemplateBeanName = "extRocketMQTemplate")
          class TransactionListenerImpl implements RocketMQLocalTransactionListener {
                @Override
                public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                  // ... local transaction process, return bollback, commit or unknown
                  return RocketMQLocalTransactionState.UNKNOWN;
                }
                @Override
                public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
                  // ... check transaction status and return bollback, commit or unknown
                  return RocketMQLocalTransactionState.COMMIT;
                }
          }
      

      然后使用extRocketMQTemplate调用sendMessageInTransaction()来发送事务消息。

    12. MessageListener消费端,是否可以指定不同的name-server而不是使用全局定义的’rocketmq.name-server’属性值 ?

      @Service
      @RocketMQMessageListener(
         nameServer = "NEW-NAMESERVER-LIST", // 可以使用这个optional属性来指定不同的name-server
         topic = "test-topic-1", 
         consumerGroup = "my-consumer_test-topic-1",
         enableMsgTrace = true,
         customizedTraceTopic = "my-trace-topic"
      )
      public class MyNameServerConsumer implements RocketMQListener {
         ...
      }
      

    参考

    https://blog.csdn.net/abu935009066/article/details/121352742

    https://blog.csdn.net/u012069313/article/details/122403509