本文解析将 RocketMQ Client 端集成为 spring-boot-starter 框架的开发细节,然后通过一个简单的示例来一步一步的讲解如何使用这个 spring-boot-starter 工具包来配置,发送和消费 RocketMQ 消息。
添加maven依赖:
org.apache.rocketmq rocketmq-spring-boot-starter${RELEASE.VERSION}
修改application.properties
## application.properties rocketmq.name-server=127.0.0.1:9876 rocketmq.producer.group=my-group
注意:
请将上述示例配置中的127.0.0.1:9876替换成真实RocketMQ的NameServer地址与端口
@SpringBootApplication public class ProducerApplication implements CommandLineRunner{ @Resource private RocketMQTemplate rocketMQTemplate; public static void main(String[] args){ SpringApplication.run(ProducerApplication.class, args); } public void run(String... args) throws Exception { //send message synchronously rocketMQTemplate.convertAndSend("test-topic-1", "Hello, World!"); //send spring message rocketMQTemplate.send("test-topic-1", MessageBuilder.withPayload("Hello, World! I'm from spring message").build()); //send messgae asynchronously rocketMQTemplate.asyncSend("test-topic-2", new OrderPaidEvent("T_001", new BigDecimal("88.00")), new SendCallback() { @Override public void onSuccess(SendResult var1) { System.out.printf("async onSucess SendResult=%s %n", var1); } @Override public void onException(Throwable var1) { System.out.printf("async onException Throwable=%s %n", var1); } }); //Send messages orderly rocketMQTemplate.syncSendOrderly("orderly_topic",MessageBuilder.withPayload("Hello, World").build(),"hashkey") //rocketMQTemplate.destroy(); // notes: once rocketMQTemplate be destroyed, you can not send any message again with this rocketMQTemplate } @Data @AllArgsConstructor public class OrderPaidEvent implements Serializable{ private String orderId; private BigDecimal paidMoney; } }
修改application.properties
## application.properties rocketmq.name-server=127.0.0.1:9876
注意:
请将上述示例配置中的127.0.0.1:9876替换成真实RocketMQ的NameServer地址与端口
编写代码
@SpringBootApplication public class ConsumerApplication{ public static void main(String[] args){ SpringApplication.run(ConsumerApplication.class, args); } @Slf4j @Service @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1") public class MyConsumer1 implements RocketMQListener{ public void onMessage(String message) { log.info("received message: {}", message); } } @Slf4j @Service @RocketMQMessageListener(topic = "test-topic-2", consumerGroup = "my-consumer_test-topic-2") public class MyConsumer2 implements RocketMQListener { public void onMessage(OrderPaidEvent orderPaidEvent) { log.info("received orderPaidEvent: {}", orderPaidEvent); } } }
从RocketMQ Spring 2.2.0开始,RocketMQ Srping支持Pull模式消费
修改application.properties
## application.properties rocketmq.name-server=127.0.0.1:9876 # When set rocketmq.pull-consumer.group and rocketmq.pull-consumer.topic, rocketmqTemplate will start lite pull consumer # If you do not want to use lite pull consumer, please do not set rocketmq.pull-consumer.group and rocketmq.pull-consumer.topic rocketmq.pull-consumer.group=my-group1 rocketmq.pull-consumer.topic=test
注意之前lite pull consumer的生效配置为rocketmq.consumer.group和rocketmq.consumer.topic,但由于非常容易与push-consumer混淆,因此在2.2.3版本之后修改为rocketmq.pull-consumer.group和rocketmq.pull-consumer.topic.
编写代码
@SpringBootApplication public class ConsumerApplication implements CommandLineRunner { @Resource private RocketMQTemplate rocketMQTemplate; @Resource(name = "extRocketMQTemplate") private RocketMQTemplate extRocketMQTemplate; public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args); } @Override public void run(String... args) throws Exception { //This is an example of pull consumer using rocketMQTemplate. Listmessages = rocketMQTemplate.receive(String.class); System.out.printf("receive from rocketMQTemplate, messages=%s %n", messages); //This is an example of pull consumer using extRocketMQTemplate. messages = extRocketMQTemplate.receive(String.class); System.out.printf("receive from extRocketMQTemplate, messages=%s %n", messages); } }
修改application.properties
## application.properties rocketmq.name-server=127.0.0.1:9876 rocketmq.producer.group=my-group
注意:
请将上述示例配置中的127.0.0.1:9876替换成真实RocketMQ的NameServer地址与端口
编写代码
@SpringBootApplication public class ProducerApplication implements CommandLineRunner{ @Resource private RocketMQTemplate rocketMQTemplate; public static void main(String[] args){ SpringApplication.run(ProducerApplication.class, args); } public void run(String... args) throws Exception { try { // Build a SpringMessage for sending in transaction Message msg = MessageBuilder.withPayload(..)...; // In sendMessageInTransaction(), the first parameter transaction name ("test") // must be same with the @RocketMQTransactionListener's member field 'transName' rocketMQTemplate.sendMessageInTransaction("test-topic", msg, null); } catch (MQClientException e) { e.printStackTrace(System.out); } } // Define transaction listener with the annotation @RocketMQTransactionListener @RocketMQTransactionListener class TransactionListenerImpl implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { // ... local transaction process, return bollback, commit or unknown return RocketMQLocalTransactionState.UNKNOWN; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { // ... check transaction status and return bollback, commit or unknown return RocketMQLocalTransactionState.COMMIT; } } }
Producer 端要想使用消息轨迹,需要多配置两个配置项:
## application.properties rocketmq.name-server=127.0.0.1:9876 rocketmq.producer.group=my-group rocketmq.producer.enable-msg-trace=true rocketmq.producer.customized-trace-topic=my-trace-topic
Consumer 端消息轨迹的功能需要在 @RocketMQMessageListener 中进行配置对应的属性:
@Service @RocketMQMessageListener( topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1", enableMsgTrace = true, customizedTraceTopic = "my-trace-topic" ) public class MyConsumer implements RocketMQListener{ ... }
注意:
默认情况下 Producer 和 Consumer 的消息轨迹功能是开启的且 trace-topic 为 RMQ_SYS_TRACE_TOPIC Consumer 端的消息轨迹 trace-topic 可以在配置文件中配置 rocketmq.consumer.customized-trace-topic 配置项,不需要为在每个 @RocketMQMessageListener 配置。
若需使用阿里云消息轨迹,则需要在@RocketMQMessageListener中将accessChannel配置为CLOUD。
Producer 端要想使用 ACL 功能,需要多配置两个配置项:
## application.properties rocketmq.name-server=127.0.0.1:9876 rocketmq.producer.group=my-group rocketmq.producer.access-key=AK rocketmq.producer.secret-key=SK
Consumer 端 ACL 功能需要在 @RocketMQMessageListener 中进行配置:
@Service @RocketMQMessageListener( topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1", accessKey = "AK", secretKey = "SK" ) public class MyConsumer implements RocketMQListener{ ... }
注意:
可以不用为每个 @RocketMQMessageListener 注解配置 AK/SK,在配置文件中配置 rocketmq.consumer.access-key 和 rocketmq.consumer.secret-key 配置项,这两个配置项的值就是默认值
RocketMQ-Spring 提供 请求/应答 语义支持。
发送Request消息使用SendAndReceive方法
注意
同步发送需要在方法的参数中指明返回值类型
异步发送需要在回调的接口中指明返回值类型
@SpringBootApplication public class ProducerApplication implements CommandLineRunner{ @Resource private RocketMQTemplate rocketMQTemplate; public static void main(String[] args){ SpringApplication.run(ProducerApplication.class, args); } public void run(String... args) throws Exception { // 同步发送request并且等待String类型的返回值 String replyString = rocketMQTemplate.sendAndReceive("stringRequestTopic", "request string", String.class); System.out.printf("send %s and receive %s %n", "request string", replyString); // 异步发送request并且等待User类型的返回值 rocketMQTemplate.sendAndReceive("objectRequestTopic", new User("requestUserName",(byte) 9), new RocketMQLocalRequestCallback() { @Override public void onSuccess(User message) { System.out.printf("send user object and receive %s %n", message.toString()); } @Override public void onException(Throwable e) { e.printStackTrace(); } }, 5000); } @Data @AllArgsConstructor public class User implements Serializable{ private String userName; private Byte userAge; } }
需要实现RocketMQReplyListener
@SpringBootApplication public class ConsumerApplication{ public static void main(String[] args){ SpringApplication.run(ConsumerApplication.class, args); } @Service @RocketMQMessageListener(topic = "stringRequestTopic", consumerGroup = "stringRequestConsumer") public class StringConsumerWithReplyString implements RocketMQReplyListener{ @Override public String onMessage(String message) { System.out.printf("------- StringConsumerWithReplyString received: %s \n", message); return "reply string"; } } @Service @RocketMQMessageListener(topic = "objectRequestTopic", consumerGroup = "objectRequestConsumer") public class ObjectConsumerWithReplyUser implements RocketMQReplyListener { public User onMessage(User user) { System.out.printf("------- ObjectConsumerWithReplyUser received: %s \n", user); User replyUser = new User("replyUserName",(byte) 10); return replyUser; } } @Data @AllArgsConstructor public class User implements Serializable{ private String userName; private Byte userAge; } }