RocketMQ 学习教程——(二)SpringBoot 集成 RocketMQ
作者:mmseoamin日期:2023-12-21

文章目录

    • 添加 RocketMQ 依赖
    • 消费者 Consumer
      • YAML 配置
      • 创建监听器
      • 消息过滤
        • Tag 过滤
        • 生产者 Producer
          • YAML 配置
          • 发送同步消息
          • 发送异步消息
          • 发送单向消息
          • 发送延迟消息
          • 发送顺序消息
          • 发送批量消息
          • 发送集合消息

            添加 RocketMQ 依赖

            1. 在 Maven 仓库【https://mvnrepository.com/】中搜索 RocketMQ 依赖:

              RocketMQ 学习教程——(二)SpringBoot 集成 RocketMQ,image-20230527214713414,第1张

            2. 在 SpringBoot 项目的 Pom.xml 文件中添加对应 MQ 版本的依赖:

              
              
                  org.apache.rocketmq
                  rocketmq-spring-boot-starter
                  2.2.2
              
              

            消费者 Consumer

            YAML 配置

            在 SpringBoot 项目的 yml 配置文件中添加以下配置:

            rocketmq:
              name-server: 192.168.68.121:9876     # rocketMq的nameServer地址
            

            创建监听器

            创建一个 MQMsgListener 类用于监听 RocketMQ 的消息,类上标注注解:@Component、@RocketMQMessageListener,该类需要实现 RocketMQListener 接口,并使用泛型指定接收的消息类型:

            @Component
            @RocketMQMessageListener(topic = "delayTopic",consumerGroup="boot-mq-group-consumer")
            public class MQMsgListener implements RocketMQListener {
                @Override
                public void onMessage(MessageExt message) {
                    String msgId = message.getMsgId();
                    String msg = new String(message.getBody());
                    System.out.println("消息id:"+msgId+"消息内容:"+msg);
                }
            }
            

            @RocketMQMessageListener 注解参数如下:

            参数描述
            topic消费者订阅的主题
            consumerGroup消费者组
            consumeMode消费模式:并发接收消息 | 有序接收消息【ConsumeMode.CONCURRENTLY or ConsumeMode.ORDERLY】
            messageModel消息模式:集群模式 | 广播模式【MessageModel.CLUSTERING or MessageModel.BROADCASTING】
            selectorType过滤消息的方式:Tag | SQL92【SelectorType.TAG or SelectorType.SQL92】
            selectorExpression过滤消息的表达式:Tag | SQL92【`tag1
            maxReconsumeTimes消息消费失败后,可被重复投递的最大次数。消息重试只针对集群消费模式生效。
            delayLevelWhenNextConsume并发模式的消息重试策略。-1,无需重试,直接放入死信队列(%DLQ%+消费组)

            消息过滤

            Tag 过滤

            消费者订阅的Tag和发送者设置的消息Tag相互匹配,则消息被投递给消费端进行消费。

            编写并启动消费者项目订阅 tagTopic 主题:

            @Component
            @RocketMQMessageListener(topic = "tagTopic",
                    consumerGroup = "boot-mq-group-consumer",
                    selectorType = SelectorType.TAG,
                    selectorExpression = "java")
            public class MQMsgListener implements RocketMQListener {
                @Override
                public void onMessage(String message) {
                    System.out.println(message);
                }
            }
            

            编写生产者 Controller,使用 RocketMQTemplate 的 syncSend() 方法发送一个带 Tag 的同步消息:

            @RestController
            public class ProducerController {
                @Autowired
                private RocketMQTemplate rocketMQTemplate;
                @GetMapping("/send/tag")
                public String sendSyncMessage() {
                    SendResult result = rocketMQTemplate.syncSend("tagTopic:java", "这是一个带有 java tag 的消息");
                    return "发送状态:" + result.getSendStatus() + "
            消息id:" + result.getMsgId(); } }

            运行项目,访问接口:http://localhost:8080/send/tag

            RocketMQ 学习教程——(二)SpringBoot 集成 RocketMQ,image-20230528191958989,第2张

            查看 RocketMQ 控制台,可以看到消息带有 java tag:

            RocketMQ 学习教程——(二)SpringBoot 集成 RocketMQ,image-20230528191938535,第3张

            查看消费者项目的 IDEA 控制台:

            RocketMQ 学习教程——(二)SpringBoot 集成 RocketMQ,image-20230528191142421,第4张

            生产者 Producer

            YAML 配置

            在 SpringBoot 项目的 yml 配置文件中添加以下配置:

            rocketmq:
              name-server: 192.168.68.121:9876     # rocketMq的nameServer地址
              producer:
                group: boot-mq-group-producer # 生产者组名
            

            注:生产者需要标注生产者组名,否则会报异常:'org.apache.rocketmq.spring.core.RocketMQTemplate' that could not be found.

            发送同步消息

            编写 Controller,使用 RocketMQTemplate 的 syncSend() 方法发送同步消息,并将消息发送的结果进行打印:

            @RestController
            public class ProducerController {
                @Autowired
                private RocketMQTemplate rocketMQTemplate;
                @GetMapping("/send/sync/{msg}")
                public String sendSyncMessage(@PathVariable String msg){
                    SendResult result = rocketMQTemplate.syncSend("syncTopic", msg);
                    return "发送状态:"+result.getSendStatus()+"
            消息id:"+result.getMsgId(); } }

            运行项目,访问接口:http://localhost:8080/send/sync/同步消息

            RocketMQ 学习教程——(二)SpringBoot 集成 RocketMQ,image-20230527231022909,第5张

            访问控制台,查看【syncTopic】主题,可以看到队列中存在一条消息:

            RocketMQ 学习教程——(二)SpringBoot 集成 RocketMQ,image-20230527231142472,第6张

            发送异步消息

            不同于同步消息,异步消息在发出后,并不会等待服务端返回响应,直接继续向下执行,发送方通过回调接口接收服务端响应,并处理响应结果。

            编写 Controller,使用 RocketMQTemplate 的 asyncSend() 方法发送异步消息,并使用回调接口打印发送的结果:

            @RestController
            public class ProducerController {
                @Autowired
                private RocketMQTemplate rocketMQTemplate;
                @GetMapping("/send/async/{msg}")
                public String sendAsyncMessage(@PathVariable String msg) {
                    rocketMQTemplate.asyncSend("asyncTopic", msg, new SendCallback() {
                        @Override
                        public void onSuccess(SendResult sendResult) {
                            System.out.println("异步消息发送成功");
                        }
                        @Override
                        public void onException(Throwable throwable) {
                            System.out.println("异步消息发送失败");
                        }
                    });
                    System.out.println("异步消息已发送完成");
                    return "发送异步消息";
                }
              
            }
            

            运行项目,访问接口:http://localhost:8080/send/async/异步消息,查看 IDEA 控制台:

            RocketMQ 学习教程——(二)SpringBoot 集成 RocketMQ,image-20230527232838438,第7张

            访问控制台,查看【asyncTopic】主题,可以看到队列中存在一条消息:

            RocketMQ 学习教程——(二)SpringBoot 集成 RocketMQ,image-20230527233249499,第8张

            发送单向消息

            编写 Controller,使用 RocketMQTemplate 的 sendOneWay() 方法发送单向消息:

            @RestController
            public class ProducerController {
                @Autowired
                private RocketMQTemplate rocketMQTemplate;
                @GetMapping("/send/oneWay/{msg}")
                public String sendOneWayMessage(@PathVariable String msg) {
                    rocketMQTemplate.sendOneWay("oneWayTopic",msg);
                    return "单向消息发送成功";
                }
            }
            

            运行项目,访问接口:http://localhost:8080/send/oneWay/单向消息

            RocketMQ 学习教程——(二)SpringBoot 集成 RocketMQ,image-20230527233640217,第9张

            访问控制台,查看【oneWayTopic】主题,可以看到队列中存在一条消息:

            RocketMQ 学习教程——(二)SpringBoot 集成 RocketMQ,image-20230527233751658,第10张

            发送延迟消息

            编写并启动消费者项目订阅 delayTopic 主题:

            @Component
            @RocketMQMessageListener(topic = "delayTopic",consumerGroup="boot-mq-group-consumer")
            public class MQMsgListener implements RocketMQListener {
                @Override
                public void onMessage(MessageExt message) {
                    String msgId = message.getMsgId();
                    String msg = new String(message.getBody());
                    System.out.println("消息id:"+msgId+"\n消息内容:"+msg+"\n消息收到时间:"+new Date());
                }
            }
            

            编写生产者 Controller,使用 RocketMQTemplate 的 syncSend() 方法发送同步消息:

            @RestController
            public class ProducerController {
                @Autowired
                private RocketMQTemplate rocketMQTemplate;
                @GetMapping("/send/delay/{msg}")
                public String sendDelayMessage(@PathVariable String msg) {
                    Message message = MessageBuilder.withPayload(msg).build();
                    // 延迟级别 "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
                    SendResult result = rocketMQTemplate.syncSend("delayTopic", message, 2000, 3);
                    return "发送状态:" + result.getSendStatus() + "
            消息id:" + result.getMsgId()+"
            消息发送时间:"+new Date(); } }

            运行项目,访问接口:http://localhost:8080/send/delay/延迟消息

            RocketMQ 学习教程——(二)SpringBoot 集成 RocketMQ,image-20230528141811562,第11张

            查看消费者项目的 IDEA 控制台,可以看到过去了10s,对应我们设置的延迟级别。

            RocketMQ 学习教程——(二)SpringBoot 集成 RocketMQ,image-20230528141834080,第12张

            发送顺序消息

            编写订单类,用于模拟【下订单->发短信->物流->签收】的顺序流程:

            public class Order {
                //订单号
                private String orderId;
                //订单名称
                private String orderName;
                //订单的流程顺序
                private String seq;
            }
            

            编写并启动两个消费者项目订阅 orderlyTopic 主题,并将消费模式设置为顺序消费模式:

            @Component
            @RocketMQMessageListener(topic = "orderlyTopic",
                    consumerGroup="boot-mq-group-consumer",
                    consumeMode = ConsumeMode.ORDERLY)
            public class MQMsgListener implements RocketMQListener {
                @Override
                public void onMessage(Order message) {
                    System.out.println("消费者:"+message);
                }
            }
            

            编写生产者 Controller,使用 RocketMQTemplate 的 syncSendOrderly() 方法发送同步顺序消息:

            @RestController
            public class ProducerController {
                @Autowired
                private RocketMQTemplate rocketMQTemplate;
                @GetMapping("/send/orderly")
                public String sendOrderlyMessage() {
                    List orders = Arrays.asList(
                            new Order(UUID.randomUUID().toString(), "下订单", "1"),
                            new Order(UUID.randomUUID().toString(), "发短信", "1"),
                            new Order(UUID.randomUUID().toString(), "物流", "1"),
                            new Order(UUID.randomUUID().toString(), "签收", "1"),
                            new Order(UUID.randomUUID().toString(), "下订单", "2"),
                            new Order(UUID.randomUUID().toString(), "发短信", "2"),
                            new Order(UUID.randomUUID().toString(), "物流", "2"),
                            new Order(UUID.randomUUID().toString(), "签收", "2")
                    );
                    //控制流程:下订单->发短信->物流->签收
                    //将 seq 作为 hashKey,这样 seq 相同的会放在同一个队列里面,顺序消费
                    orders.forEach(order -> {
                        rocketMQTemplate.syncSendOrderly("orderlyTopic",order,order.getSeq());
                    });
                    return "发送成功";
                }
            }
            

            运行项目,访问接口:http:localhost:8080/send/orderly

            RocketMQ 学习教程——(二)SpringBoot 集成 RocketMQ,image-20230528152807514,第13张

            查看 RocketMQ 控制台,可以看到我们的消息分别存储在两个队列中:

            RocketMQ 学习教程——(二)SpringBoot 集成 RocketMQ,image-20230528152925141,第14张

            查看消费者项目的 IDEA 控制台,按照消息的顺序进行消费:

            RocketMQ 学习教程——(二)SpringBoot 集成 RocketMQ,image-20230528152848032,第15张

            发送批量消息

            编写并启动消费者项目订阅 batchOrderly 主题:

            @Component
            @RocketMQMessageListener(topic = "batchOrderly",
                    consumerGroup="boot-mq-group-consumer")
            public class MQMsgListener implements RocketMQListener {
                @Override
                public void onMessage(Order message) {
                    System.out.println(Thread.currentThread().getName()+":"+message);
                }
              
            }
            

            编写生产者 Controller,将消息打包成 Collection msgs 传入 syncSend() 方法中发送:

            @RestController
            public class ProducerController {
              @Autowired
              private RocketMQTemplate rocketMQTemplate;
              @GetMapping("/send/batch")
              public String sendOrderlyMessage() {
                List messages = Arrays.asList(
                  MessageBuilder.withPayload(new Order(UUID.randomUUID().toString(), "下订单", "1")).build(),
                  MessageBuilder.withPayload(new Order(UUID.randomUUID().toString(), "下订单", "1")).build(),
                  MessageBuilder.withPayload(new Order(UUID.randomUUID().toString(), "下订单", "1")).build(),
                  MessageBuilder.withPayload(new Order(UUID.randomUUID().toString(), "下订单", "1")).build()
                );
                return rocketMQTemplate.syncSend("batchOrderly", messages).getSendStatus().toString();
                
              }
            }
            

            运行项目,访问接口:http:localhost:8080/send/batch

            RocketMQ 学习教程——(二)SpringBoot 集成 RocketMQ,image-20230528161620859,第16张

            查看 RocketMQ 控制台,可以看到队列中一次传入4条消息:

            RocketMQ 学习教程——(二)SpringBoot 集成 RocketMQ,image-20230528161706194,第17张

            查看消费者项目的 IDEA 控制台,多个线程并发进行消费:

            RocketMQ 学习教程——(二)SpringBoot 集成 RocketMQ,image-20230528161804943,第18张

            发送集合消息

            编写并启动消费者项目订阅 listTopic 主题:

            @Component
            @RocketMQMessageListener(topic = "listTopic",
                    consumerGroup="boot-mq-group-consumer")
            public class MQMsgListener implements RocketMQListener> {
                @Override
                public void onMessage(List orders) {
                    orders.forEach(o -> {
                        System.out.println(Thread.currentThread().getName()+":"+o);
                    });
                }
            }
            

            编写生产者 Controller,将集合传入 syncSend() 方法中发送:

            @RestController
            public class ProducerController {
              @Autowired
              private RocketMQTemplate rocketMQTemplate;
              @GetMapping("/send/list")
              public String sendOrderlyMessage() {
                List orders = Arrays.asList(
                  new Order(UUID.randomUUID().toString(), "下订单", "1"),
                  new Order(UUID.randomUUID().toString(), "下订单", "1"),
                  new Order(UUID.randomUUID().toString(), "下订单", "1"),
                  new Order(UUID.randomUUID().toString(), "下订单", "1")
                );
                rocketMQTemplate.syncSend("listTopic",orders);
                return "发送成功";
              }
            }
            

            运行项目,访问接口:http:localhost:8080/send/list

            RocketMQ 学习教程——(二)SpringBoot 集成 RocketMQ,image-20230528161620859,第19张

            查看 RocketMQ 控制台,可以看到队列中一条消息:

            RocketMQ 学习教程——(二)SpringBoot 集成 RocketMQ,image-20230528163701846,第20张

            查看消费者项目的 IDEA 控制台,进行消费:

            RocketMQ 学习教程——(二)SpringBoot 集成 RocketMQ,image-20230528163745691,第21张