RocketMQ 主要由 Producer、Broker、Consumer 三部分组成,其中 Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个 Topic 的消息,每个Topic 的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个 Topic 中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个 Consumer 实例构成。
负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到 broker 服务器。RocketMQ 提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要 Broker 返回确认信息,单向发送不需要。
负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从 Broker 服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。
表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ 进行消息订阅的基本单位。
消息中转角色,负责存储消息、转发消息。代理服务器在 RocketMQ 系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP 列表。多个 Namesrv 实例组成集群,但相互独立,没有信息交换。
直接使用官方的 rocketmq-spring-boot-starter
4.0.0 org.springframework.boot spring-boot-starter-parent 2.6.5 com.chaoyue rocketmq 0.0.1-SNAPSHOT rocketmq Demo project for Spring Boot 1.8 org.springframework.boot spring-boot-starter-web org.apache.rocketmq rocketmq-spring-boot-starter 2.1.1 org.springframework.boot spring-boot-starter-test test org.springframework.boot spring-boot-maven-plugin
2.1.1版本,内置的 rocketmq-client 版本为4.7.1应该与你 rocketmq 服务保持一致。
application.yml 文件中添加如下配置:
server: port: 8081 rocketmq: # NameServer name-server: 192.168.1.38:9876 # 默认的消息组 producer: group: group1
注入 RocketMQTemplate
package com.chaoyue.rocketmq.producer; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class RocketMQProducer { @Autowired private RocketMQTemplate rocketMQTemplate; // 发送消息 public void sendMessage(String topic, String msg) { rocketMQTemplate.convertAndSend(topic, msg); } }
package com.chaoyue.rocketmq.consumer; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; /** * 泛型可选 */ @Component @RocketMQMessageListener(consumerGroup = "group1", topic = "topic1") public class RocketMQConsumer implements RocketMQListener{ @Override public void onMessage(String message) { System.out.println("Received message : " + message); } }
package com.chaoyue.rocketmq.controller; import com.chaoyue.rocketmq.producer.RocketMQProducer; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; @RestController @RequestMapping("/RocketMQ") public class RocketMQController { private final String topic = "topic1"; @Resource private RocketMQProducer producer; @RequestMapping("/sendMessage") public String sendMessage(String message) { producer.sendMessage(topic, message); return "消息已发送"; } }
启动服务后,浏览器输入:http://localhost:8081/RocketMQ/sendMessage?message=hi,sendmessage:
可靠的同步传输应用于广泛的场景,如重要通知消息、短信通知、短信营销系统等。
// 发送字符串 rocketMQTemplate.syncSend("springTopic", "Hello, World!"); // 同步发送 rocketMQTemplate.convertAndSend("test-topic-1", "Hello, World!"); // 发送对象 rocketMQTemplate.syncSend("userTopic", new User().setUserAge((byte) 18).setUserName("Kitty")); // 发送spring 消息 rocketMQTemplate.syncSend(userTopic, MessageBuilder.withPayload( new User().setUserAge((byte) 21).setUserName("Lester")).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON_VALUE).build());
异步传输一般用于响应时间敏感的业务场景。
rocketMQTemplate.asyncSend("orderPaidTopic", "异步发送", new SendCallback() { @Override public void onSuccess(SendResult var1) { // 成功回调 System.out.printf("async onSucess SendResult=%s %n", var1); } @Override public void onException(Throwable var1) { // 失败回调 System.out.printf("async onException Throwable=%s %n", var1); } });
单向传输用于需要中等可靠的情况,例如日志收集
rocketMQTemplate.sendOneway("springTopic", "Hello, World!");
rocketMQTemplate.syncSendOrderly("orderly_topic",MessageBuilder.withPayload("Hello, World").build(),"hashkey")
Message msg = MessageBuilder.withPayload("rocketMQTemplate transactional message ").build(); // 第一个参数必须与@RocketMQTransactionListener的成员字段'transName'相同 rocketMQTemplate.sendMessageInTransaction("test-topic", msg, null); // 使用注解@RocketMQTransactionListener定义事务监听器 @RocketMQTransactionListener class TransactionListenerImpl implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { // ... local transaction process, return bollback, commit or unknown return RocketMQLocalTransactionState.UNKNOWN; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { // ... check transaction status and return bollback, commit or unknown return RocketMQLocalTransactionState.COMMIT; } }
rocketMQTemplate.convertAndSend(msgExtTopic + ":tag0", "I'm from tag0"); // tag0 不是消费者选择的,可以通过tag过滤掉 rocketMQTemplate.convertAndSend(msgExtTopic + ":tag1", "I'm from tag1");
Listmsgs = new ArrayList (); for (int i = 0; i < 10; i++) { msgs.add(MessageBuilder.withPayload("Hello RocketMQ Batch Msg#" + i). setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build()); } SendResult sr = rocketMQTemplate.syncSend(springTopic, msgs, 60000);
@Slf4j @Service @RocketMQMessageListener(topic = "laker-123", consumerGroup = "laker_consumer_group") public class MyConsumer1 implements RocketMQListener{ public void onMessage(String message) { log.info("received message: {}", message); } } @Slf4j @Service @RocketMQMessageListener(topic = "test-topic-2", consumerGroup = "my-consumer_test-topic-2") public class MyConsumer2 implements RocketMQListener { public void onMessage(OrderPaidEvent orderPaidEvent) { log.info("received orderPaidEvent: {}", orderPaidEvent); } }
从RocketMQ Spring 2.2.0开始,RocketMQ Srping支持Pull模式消费
修改application.properties
## application.properties rocketmq.name-server=127.0.0.1:9876 rocketmq.consumer.group=my-group1 rocketmq.consumer.topic=test
编写代码
@SpringBootApplication public class ConsumerApplication implements CommandLineRunner { @Resource private RocketMQTemplate rocketMQTemplate; @Override public void run(String... args) throws Exception { //This is an example of pull consumer using rocketMQTemplate. Listmessages = rocketMQTemplate.receive(String.class); System.out.printf("receive from rocketMQTemplate, messages=%s %n", messages); } }
RocketMQ-Spring 提供 请求/应答 语义支持。
发送Request消息使用SendAndReceive方法
同步发送需要在方法的参数中指明返回值类型
异步发送需要在回调的接口中指明返回值类型
// 同步发送request并且等待String类型的返回值 String replyString = rocketMQTemplate.sendAndReceive("stringRequestTopic", "request string", String.class); // 异步发送request并且等待User类型的返回值 rocketMQTemplate.sendAndReceive("objectRequestTopic", new User("requestUserName",(byte) 9), new RocketMQLocalRequestCallback() { @Override public void onSuccess(User message) { System.out.printf("send user object and receive %s %n", message.toString()); } @Override public void onException(Throwable e) { e.printStackTrace(); } }, 5000);
需要实现RocketMQReplyListener
@SpringBootApplication public class ConsumerApplication{ public static void main(String[] args){ SpringApplication.run(ConsumerApplication.class, args); } @Service @RocketMQMessageListener(topic = "stringRequestTopic", consumerGroup = "stringRequestConsumer") public class StringConsumerWithReplyString implements RocketMQReplyListener{ @Override public String onMessage(String message) { System.out.printf("------- StringConsumerWithReplyString received: %s \n", message); return "reply string"; } } @Service @RocketMQMessageListener(topic = "objectRequestTopic", consumerGroup = "objectRequestConsumer") public class ObjectConsumerWithReplyUser implements RocketMQReplyListener { public void onMessage(User user) { System.out.printf("------- ObjectConsumerWithReplyUser received: %s \n", user); User replyUser = new User("replyUserName",(byte) 10); return replyUser; } } @Data @AllArgsConstructor public class User implements Serializable{ private String userName; private Byte userAge; } }
Producer 端要想使用 ACL 功能,需要多配置两个配置项:
## application.properties rocketmq.name-server=127.0.0.1:9876 rocketmq.producer.group=my-group rocketmq.producer.access-key=AK rocketmq.producer.secret-key=SK
Consumer 端 ACL 功能需要在 @RocketMQMessageListener 中进行配置
@Service @RocketMQMessageListener( topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1", accessKey = "AK", secretKey = "SK" ) public class MyConsumer implements RocketMQListener{ ... }
注意:
可以不用为每个 @RocketMQMessageListener 注解配置 AK/SK,在配置文件中配置 rocketmq.consumer.access-key 和 rocketmq.consumer.secret-key 配置项,这两个配置项的值就是默认值
Producer 端要想使用消息轨迹,需要多配置两个配置项:
## application.properties rocketmq.name-server=127.0.0.1:9876 rocketmq.producer.group=my-group rocketmq.producer.enable-msg-trace=true rocketmq.producer.customized-trace-topic=my-trace-topic
Consumer 端消息轨迹的功能需要在 @RocketMQMessageListener 中进行配置对应的属性:
@Service @RocketMQMessageListener( topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1", enableMsgTrace = true, customizedTraceTopic = "my-trace-topic" ) public class MyConsumer implements RocketMQListener{ ... }
注意:
默认情况下 Producer 和 Consumer 的消息轨迹功能是开启的且 trace-topic 为 RMQ_SYS_TRACE_TOPIC
Consumer 端的消息轨迹 trace-topic 可以在配置文件中配置 rocketmq.consumer.customized-trace-topic 配置项,不需要为在每个 @RocketMQMessageListener 配置。
阿里云消息轨迹正常显示需要设置accessChannel配置为CLOUD。
生产环境有多个nameserver该如何连接?
rocketmq.name-server支持配置多个nameserver地址,采用;分隔即可。例如:172.19.0.1:9876;172.19.0.2:9876
rocketMQTemplate 在什么时候被销毁?
开发者在项目中使用rocketMQTemplate发送消息时,不需要手动执行rocketMQTemplate.destroy()方法, rocketMQTemplate会在spring容器销毁时自动销毁。
启动报错:Caused by: org.apache.rocketmq.client.exception.MQClientException: The consumer group[xxx] has been created before, specify another name please
RocketMQ在设计时就不希望一个消费者同时处理多个类型的消息,因此同一个consumerGroup下的consumer职责应该是一样的,不要干不同的事情(即消费多个topic)。建议consumerGroup与topic一一对应。
发送的消息内容体是如何被序列化与反序列化的?
RocketMQ的消息体都是以byte[]方式存储。当业务系统的消息内容体如果是java.lang.String类型时,统一按照utf-8编码转成byte[];如果业务系统的消息内容为非java.lang.String类型,则采用jackson-databind序列化成JSON格式的字符串之后,再统一按照utf-8编码转成byte[]。
如何指定topic 的 tags?
RocketMQ的最佳实践中推荐:一个应用尽可能用一个Topic,消息子类型用tags来标识,tags可以由应用自由设置。 在使用rocketMQTemplate发送消息时,通过设置发送方法的destination参数来设置消息的目的地,destination的格式为topicName:tagName,:前面表示topic的名称,后面表示tags名称。
注意:
tags从命名来看像是一个复数,但发送消息时,目的地只能指定一个topic下的一个tag,不能指定多个。
发送消息时如何设置消息的key?
可以通过重载的 xxxSend(String destination, Message> msg, ...) 方法来发送消息,指定msg的headers来完成。示例:
Message> message = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, msgId).build(); rocketMQTemplate.send("topic-test", message);
同理还可以根据上面的方式来设置消息的FLAG、WAIT_STORE_MSG_OK以及一些用户自定义的其它头信息。
注意:
在将Spring的Message转化为RocketMQ的Message时,为防止header信息与RocketMQ的系统属性冲突,在所有header的名称前面都统一添加了前缀USERS_。因此在消费时如果想获取自定义的消息头信息,请遍历头信息中以USERS_开头的key即可。
消费消息时,除了获取消息payload外,还想获取RocketMQ消息的其它系统属性,需要怎么做?
消费者在实现RocketMQListener接口时,只需要起泛型为MessageExt即可,这样在onMessage方法将接收到RocketMQ原生的MessageExt消息。
@Slf4j @Service @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1") public class MyConsumer2 implements RocketMQListener{ public void onMessage(MessageExt messageExt) { log.info("received messageExt: {}", messageExt); } }
如何指定消费者从哪开始消费消息,或开始消费的位置?
消费者默认开始消费的位置请参考:RocketMQ FAQ。 若想自定义消费者开始的消费位置,只需在消费者类添加一个RocketMQPushConsumerLifecycleListener接口的实现即可。 示例如下:
@Slf4j @Service @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1") public class MyConsumer1 implements RocketMQListener, RocketMQPushConsumerLifecycleListener { @Override public void onMessage(String message) { log.info("received message: {}", message); } @Override public void prepareStart(final DefaultMQPushConsumer consumer) { // set consumer consume message from now consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP); consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis())); } }
同理,任何关于DefaultMQPushConsumer的更多其它其它配置,都可以采用上述方式来完成。
如何发送事务消息?
在客户端,首先用户需要实现RocketMQLocalTransactionListener接口,并在接口类上注解声明@RocketMQTransactionListener,实现确认和回查方法;然后再使用资源模板RocketMQTemplate, 调用方法sendMessageInTransaction()来进行消息的发布。
注意:从RocketMQ-Spring 2.1.0版本之后,注解@RocketMQTransactionListener不能设置txProducerGroup、ak、sk,这些值均与对应的RocketMQTemplate保持一致。
如何声明不同name-server或者其他特定的属性来定义非标的RocketMQTemplate?
第一步: 定义非标的RocketMQTemplate使用你需要的属性,可以定义与标准的RocketMQTemplate不同的nameserver、groupname等。如果不定义,它们取全局的配置属性值或默认值。
// 这个RocketMQTemplate的Spring Bean名是'extRocketMQTemplate', 与所定义的类名相同(但首字母小写) @ExtRocketMQTemplateConfiguration(nameServer="127.0.0.1:9876" , ... // 定义其他属性,如果有必要。 ) public class ExtRocketMQTemplate extends RocketMQTemplate { //类里面不需要做任何修改 }
第二步: 使用这个非标RocketMQTemplate
@Resource(name = "extRocketMQTemplate") // 这里必须定义name属性来指向上述具体的Spring Bean. private RocketMQTemplate extRocketMQTemplate;
接下来就可以正常使用这个extRocketMQTemplate了。
如何使用非标的RocketMQTemplate发送事务消息?
首先用户需要实现RocketMQLocalTransactionListener接口,并在接口类上注解声明@RocketMQTransactionListener,注解字段的rocketMQTemplateBeanName指明为非标的RocketMQTemplate的Bean name(若不设置则默认为标准的RocketMQTemplate),比如非标的RocketMQTemplate Bean name为“extRocketMQTemplate",则代码如下:
@RocketMQTransactionListener(rocketMQTemplateBeanName = "extRocketMQTemplate") class TransactionListenerImpl implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { // ... local transaction process, return bollback, commit or unknown return RocketMQLocalTransactionState.UNKNOWN; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { // ... check transaction status and return bollback, commit or unknown return RocketMQLocalTransactionState.COMMIT; } }
然后使用extRocketMQTemplate调用sendMessageInTransaction()来发送事务消息。
MessageListener消费端,是否可以指定不同的name-server而不是使用全局定义的’rocketmq.name-server’属性值 ?
@Service @RocketMQMessageListener( nameServer = "NEW-NAMESERVER-LIST", // 可以使用这个optional属性来指定不同的name-server topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1", enableMsgTrace = true, customizedTraceTopic = "my-trace-topic" ) public class MyNameServerConsumer implements RocketMQListener{ ... }
https://blog.csdn.net/abu935009066/article/details/121352742
https://blog.csdn.net/u012069313/article/details/122403509