相关推荐recommended
Springbootg整合RocketMQ ——使用 rocketmq-spring-boot-starter 来配置发送和消费 RocketMQ 消息
作者:mmseoamin日期:2024-03-20

       本文解析将 RocketMQ Client 端集成为 spring-boot-starter 框架的开发细节,然后通过一个简单的示例来一步一步的讲解如何使用这个 spring-boot-starter 工具包来配置,发送和消费 RocketMQ 消息

一、使用方法

添加maven依赖:



    org.apache.rocketmq
    rocketmq-spring-boot-starter
    ${RELEASE.VERSION}

二、发送消息

修改application.properties

## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group

注意:

请将上述示例配置中的127.0.0.1:9876替换成真实RocketMQ的NameServer地址与端口

1、编写代码

@SpringBootApplication
public class ProducerApplication implements CommandLineRunner{
    @Resource
    private RocketMQTemplate rocketMQTemplate;
    
    public static void main(String[] args){
        SpringApplication.run(ProducerApplication.class, args);
    }
    
    public void run(String... args) throws Exception {
      	//send message synchronously
        rocketMQTemplate.convertAndSend("test-topic-1", "Hello, World!");
      	//send spring message
        rocketMQTemplate.send("test-topic-1", MessageBuilder.withPayload("Hello, World! I'm from spring message").build());
        //send messgae asynchronously
      	rocketMQTemplate.asyncSend("test-topic-2", new OrderPaidEvent("T_001", new BigDecimal("88.00")), new SendCallback() {
            @Override
            public void onSuccess(SendResult var1) {
                System.out.printf("async onSucess SendResult=%s %n", var1);
            }
            @Override
            public void onException(Throwable var1) {
                System.out.printf("async onException Throwable=%s %n", var1);
            }
        });
      	//Send messages orderly
      	rocketMQTemplate.syncSendOrderly("orderly_topic",MessageBuilder.withPayload("Hello, World").build(),"hashkey")
        
        //rocketMQTemplate.destroy(); // notes:  once rocketMQTemplate be destroyed, you can not send any message again with this rocketMQTemplate
    }
    
    @Data
    @AllArgsConstructor
    public class OrderPaidEvent implements Serializable{
        private String orderId;
        
        private BigDecimal paidMoney;
    }
}

三、接收消息

1、Push模式

修改application.properties

## application.properties
rocketmq.name-server=127.0.0.1:9876

注意:

请将上述示例配置中的127.0.0.1:9876替换成真实RocketMQ的NameServer地址与端口

编写代码

@SpringBootApplication
public class ConsumerApplication{
    
    public static void main(String[] args){
        SpringApplication.run(ConsumerApplication.class, args);
    }
    
    @Slf4j
    @Service
    @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
    public class MyConsumer1 implements RocketMQListener{
        public void onMessage(String message) {
            log.info("received message: {}", message);
        }
    }
    
    @Slf4j
    @Service
    @RocketMQMessageListener(topic = "test-topic-2", consumerGroup = "my-consumer_test-topic-2")
    public class MyConsumer2 implements RocketMQListener{
        public void onMessage(OrderPaidEvent orderPaidEvent) {
            log.info("received orderPaidEvent: {}", orderPaidEvent);
        }
    }
}

2、Pull模式

从RocketMQ Spring 2.2.0开始,RocketMQ Srping支持Pull模式消费

修改application.properties

## application.properties
rocketmq.name-server=127.0.0.1:9876
# When set rocketmq.pull-consumer.group and rocketmq.pull-consumer.topic, rocketmqTemplate will start lite pull consumer
# If you do not want to use lite pull consumer, please do not set rocketmq.pull-consumer.group and rocketmq.pull-consumer.topic
rocketmq.pull-consumer.group=my-group1
rocketmq.pull-consumer.topic=test

注意之前lite pull consumer的生效配置为rocketmq.consumer.group和rocketmq.consumer.topic,但由于非常容易与push-consumer混淆,因此在2.2.3版本之后修改为rocketmq.pull-consumer.group和rocketmq.pull-consumer.topic.

编写代码

@SpringBootApplication
public class ConsumerApplication implements CommandLineRunner {
    @Resource
    private RocketMQTemplate rocketMQTemplate;
    @Resource(name = "extRocketMQTemplate")
    private RocketMQTemplate extRocketMQTemplate;
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
    @Override
    public void run(String... args) throws Exception {
        //This is an example of pull consumer using rocketMQTemplate.
        List messages = rocketMQTemplate.receive(String.class);
        System.out.printf("receive from rocketMQTemplate, messages=%s %n", messages);
        //This is an example of pull consumer using extRocketMQTemplate.
        messages = extRocketMQTemplate.receive(String.class);
        System.out.printf("receive from extRocketMQTemplate, messages=%s %n", messages);
    }
}

四、事务消息

修改application.properties

## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group

注意:

请将上述示例配置中的127.0.0.1:9876替换成真实RocketMQ的NameServer地址与端口

编写代码

@SpringBootApplication
public class ProducerApplication implements CommandLineRunner{
    @Resource
    private RocketMQTemplate rocketMQTemplate;
    public static void main(String[] args){
        SpringApplication.run(ProducerApplication.class, args);
    }
    public void run(String... args) throws Exception {
        try {
            // Build a SpringMessage for sending in transaction
            Message msg = MessageBuilder.withPayload(..)...;
            // In sendMessageInTransaction(), the first parameter transaction name ("test")
            // must be same with the @RocketMQTransactionListener's member field 'transName'
            rocketMQTemplate.sendMessageInTransaction("test-topic", msg, null);
        } catch (MQClientException e) {
            e.printStackTrace(System.out);
        }
    }
    // Define transaction listener with the annotation @RocketMQTransactionListener
    @RocketMQTransactionListener
    class TransactionListenerImpl implements RocketMQLocalTransactionListener {
          @Override
          public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            // ... local transaction process, return bollback, commit or unknown
            return RocketMQLocalTransactionState.UNKNOWN;
          }
          @Override
          public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
            // ... check transaction status and return bollback, commit or unknown
            return RocketMQLocalTransactionState.COMMIT;
          }
    }
}

五、消息轨迹

Producer 端要想使用消息轨迹,需要多配置两个配置项:

## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group
rocketmq.producer.enable-msg-trace=true
rocketmq.producer.customized-trace-topic=my-trace-topic

Consumer 端消息轨迹的功能需要在 @RocketMQMessageListener 中进行配置对应的属性:

@Service
@RocketMQMessageListener(
    topic = "test-topic-1", 
    consumerGroup = "my-consumer_test-topic-1",
    enableMsgTrace = true,
    customizedTraceTopic = "my-trace-topic"
)
public class MyConsumer implements RocketMQListener {
    ...
}

注意:

默认情况下 Producer 和 Consumer 的消息轨迹功能是开启的且 trace-topic 为 RMQ_SYS_TRACE_TOPIC Consumer 端的消息轨迹 trace-topic 可以在配置文件中配置 rocketmq.consumer.customized-trace-topic 配置项,不需要为在每个 @RocketMQMessageListener 配置。

若需使用阿里云消息轨迹,则需要在@RocketMQMessageListener中将accessChannel配置为CLOUD。

五、ACL功能

Producer 端要想使用 ACL 功能,需要多配置两个配置项:

## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group
rocketmq.producer.access-key=AK
rocketmq.producer.secret-key=SK

Consumer 端 ACL 功能需要在 @RocketMQMessageListener 中进行配置:

@Service
@RocketMQMessageListener(
    topic = "test-topic-1", 
    consumerGroup = "my-consumer_test-topic-1",
    accessKey = "AK",
    secretKey = "SK"
)
public class MyConsumer implements RocketMQListener {
    ...
}

注意:

可以不用为每个 @RocketMQMessageListener 注解配置 AK/SK,在配置文件中配置 rocketmq.consumer.access-key 和 rocketmq.consumer.secret-key 配置项,这两个配置项的值就是默认值

六、请求 应答语义支持

RocketMQ-Spring 提供 请求/应答 语义支持。

  • Producer端

    发送Request消息使用SendAndReceive方法

    注意

    同步发送需要在方法的参数中指明返回值类型

    异步发送需要在回调的接口中指明返回值类型

    @SpringBootApplication
    public class ProducerApplication implements CommandLineRunner{
        @Resource
        private RocketMQTemplate rocketMQTemplate;
        
        public static void main(String[] args){
            SpringApplication.run(ProducerApplication.class, args);
        }
        
        public void run(String... args) throws Exception {
            // 同步发送request并且等待String类型的返回值
            String replyString = rocketMQTemplate.sendAndReceive("stringRequestTopic", "request string", String.class);
            System.out.printf("send %s and receive %s %n", "request string", replyString);
            // 异步发送request并且等待User类型的返回值
            rocketMQTemplate.sendAndReceive("objectRequestTopic", new User("requestUserName",(byte) 9), new RocketMQLocalRequestCallback() {
                @Override public void onSuccess(User message) {
                    System.out.printf("send user object and receive %s %n", message.toString());
                }
                @Override public void onException(Throwable e) {
                    e.printStackTrace();
                }
            }, 5000);
        }
        
        @Data
        @AllArgsConstructor
        public class User implements Serializable{
            private String userName;
        		private Byte userAge;
        }
    }
    • Consumer端

      需要实现RocketMQReplyListener 接口,其中T表示接收值的类型,R表示返回值的类型。

      @SpringBootApplication
      public class ConsumerApplication{
          
          public static void main(String[] args){
              SpringApplication.run(ConsumerApplication.class, args);
          }
          
          @Service
          @RocketMQMessageListener(topic = "stringRequestTopic", consumerGroup = "stringRequestConsumer")
          public class StringConsumerWithReplyString implements RocketMQReplyListener {
              @Override
              public String onMessage(String message) {
                System.out.printf("------- StringConsumerWithReplyString received: %s \n", message);
                return "reply string";
              }
            }
         
          @Service
          @RocketMQMessageListener(topic = "objectRequestTopic", consumerGroup = "objectRequestConsumer")
          public class ObjectConsumerWithReplyUser implements RocketMQReplyListener{
              public User onMessage(User user) {
                	System.out.printf("------- ObjectConsumerWithReplyUser received: %s \n", user);
                	User replyUser = new User("replyUserName",(byte) 10);	
                	return replyUser;
              }
          }
          @Data
          @AllArgsConstructor
          public class User implements Serializable{
              private String userName;
          		private Byte userAge;
          }
      }