SpringBoot 整合RabbitMQ 之延迟队列实验
作者:mmseoamin日期:2023-12-13

系列文章目录

第一章 Java线程池技术应用

第二章 CountDownLatch和Semaphone的应用

第三章 Spring Cloud 简介

第四章 Spring Cloud Netflix 之 Eureka

第五章 Spring Cloud Netflix 之 Ribbon

第六章 Spring Cloud 之 OpenFeign

第七章 Spring Cloud 之 GateWay

第八章 Spring Cloud Netflix 之 Hystrix

第九章 代码管理gitlab 使用

第十章 SpringCloud Alibaba 之 Nacos discovery

第十一章 SpringCloud Alibaba 之 Nacos Config

第十二章 Spring Cloud Alibaba 之 Sentinel

第十三章 JWT

第十四章 RabbitMQ应用

第十五章 RabbitMQ 延迟队列

SpringBoot 整合RabbitMQ 之延迟队列实验,在这里插入图片描述,第1张


文章目录

  • 系列文章目录
    • @[TOC](文章目录)
    • 前言
    • 1、RabbitMQ延迟队列
      • 1.1、方式1:RabbitMQ通过死信机制来实现延迟队列的功能
      • 1.2、方式二:安装延迟队列插件
        • 1.2.1、安装延迟队列插件:
        • 2、消息确认机制
          • 2.1、生产确认
          • 2.2、消费确认

            前言

            实际业务中,例如秒杀系统,秒杀商品成功会有截止时间,这时需要用到RabbitMQ延迟服务。

            1、RabbitMQ延迟队列

            1.1、方式1:RabbitMQ通过死信机制来实现延迟队列的功能

            • TTL ,即 Time-To-Live,存活时间,消息和队列都可以设置存活时间
            • Dead Letter,即死信,若给消息设置了存活时间,当超过存活时间后消息还没有被消费,则该消息变成了死信
            • Dead Letter Exchanges(DLX),即死信交换机
            • Dead Letter Routing Key (DLK),死信路由键
              /***********************延迟队列*************************/
              //创建立即消费队列
              @Bean
              public Queue immediateQueue(){
                  return new Queue("immediateQueue");
              }
              //创建立即消费交换机
              @Bean
              public DirectExchange immediateExchange(){
                  return new DirectExchange("immediateExchange");
              }
              @Bean
              public Binding bindingImmediate(@Qualifier("immediateQueue") Queue queue,@Qualifier("immediateExchange") DirectExchange directExchange){
                  return BindingBuilder.bind(queue).to(directExchange).with("immediateRoutingKey");
              }
              //创建延迟队列
              @Bean
              public Queue delayQueue(){
                  Map params = new HashMap<>();
                  //死信队列转发的死信转发到立即处理信息的交换机
                  params.put("x-dead-letter-exchange","immediateExchange");
                  //死信转化携带的routing-key
                  params.put("x-dead-letter-routing-key","immediateRoutingKey");
                  //设置消息过期时间,单位:毫秒
                  params.put("x-message-ttl",60 * 1000);
                  return new Queue("delayQueue",true,false,false,params);
              }
              @Bean
              public DirectExchange delayExchange(){
                  return new DirectExchange("delayExchange");
              }
              @Bean
              public Binding bindingDelay(@Qualifier("delayQueue") Queue queue,@Qualifier("delayExchange") DirectExchange directExchange){
                  return BindingBuilder.bind(queue).to(directExchange).with("delayRoutingKey");
              }
              
              @Test
              public void sendDelay(){
                  this.rabbitTemplate.convertAndSend("delayExchange","delayRoutingKey","Hello world topic");
              }
              

              1.2、方式二:安装延迟队列插件

              1.2.1、安装延迟队列插件:

              https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.11.1/rabbitmq_delayed_message_exchange-3.11.1.ez

              下载解压,到plugins目录,执行以下的命令:

              rabbitmq-plugins enable rabbitmq_delayed_message_exchange

              /**************延迟队列一个单一queue*******************/
              @Bean
              public Queue delayNewQueue(){
                  return new Queue("delayNewQueue");
              }
              @Bean
              public CustomExchange delayNewExchange(){
                  Map args = new HashMap<>();
                  // 设置类型,可以为fanout、direct、topic
                  args.put("x-delayed-type", "direct");
                  return new CustomExchange("delayNewExchange","x-delayed-message", true,false,args);
              }
              @Bean
              public Binding bindingNewDelay(@Qualifier("delayNewQueue") Queue queue,@Qualifier("delayNewExchange") CustomExchange customExchange){
                  return BindingBuilder.bind(queue).to(customExchange).with("delayNewRoutingKey").noargs();
              }
              
              @Test
              public void sendDelay() {
                  //生产端写完了
                  UserInfo userInfo = new UserInfo();
                  userInfo.setPassword("13432432");
                  userInfo.setUserAccount("kelvin");
                  this.rabbitTemplate.convertAndSend("delayNewExchange", "delayNewRoutingKey", userInfo
                          , a -> {
                              //单位毫秒
                              a.getMessageProperties().setDelay(30000);
                              return a;
                          });
              }
              

              2、消息确认机制

              消息确认分为两部分: 生产确认 和 消费确认。

              生产确认: 生产者生产消息后,将消息发送到交换机,触发确认回调;交换机将消息转发到绑定队列,若失败则触发返回回调。

              消费确认: 默认情况下消息被消费者从队列中获取后即发送确认,不管消费者处理消息时是否失败,不需要额外代码,但是不能保证消息被正确消费。我们增加手动确认,则需要代码中明确进行消息确认。

              2.1、生产确认

              @Bean
              public RabbitTemplate getTemplate(ConnectionFactory connectionFactory){
                  RabbitTemplate template = new RabbitTemplate(connectionFactory);
                  //消息发送到交换器Exchange后触发回调
                  template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                      @Override
                      public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                          //  可以进行消息入库操作
                          log.info("消息唯一标识 correlationData = {}", correlationData);
                          log.info("确认结果 ack = {}", ack);
                          log.info("失败原因 cause = {}", cause);
                      }
                  });
                  // 配置这个,下面的ReturnCallback 才会起作用
                  template.setMandatory(true);
                  // 如果消息从交换器发送到对应队列失败时触发(比如 根据发送消息时指定的routingKey找不到队列时会触发)
                  template.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
                      @Override
                      public void returnedMessage(ReturnedMessage returnedMessage) {
                          //  可以进行消息入库操作
                          log.info("消息主体 message = {}", returnedMessage.getMessage());
                          log.info("回复码 replyCode = {}", returnedMessage.getReplyCode());
                          log.info("回复描述 replyText = {}", returnedMessage.getReplyText());
                          log.info("交换机名字 exchange = {}", returnedMessage.getExchange());
                          log.info("路由键 routingKey = {}", returnedMessage.getRoutingKey());
                      }
                  });
                  return template;
              }
              
              spring:
                cloud:
                  nacos:
                    discovery:
                      server-addr: localhost:8848
                application:
                  name: drp-user-service  #微服务名称
                datasource:
                  username: root
                  password: root
                  url: jdbc:mysql://127.0.0.1:3306/drp
                  driver-class-name: com.mysql.cj.jdbc.Driver
                rabbitmq:
                  host: 127.0.0.1
                  port: 5672
                  username: root
                  password: root
                  virtual-host: root_vh
                  # 确认消息已发送到交换机(Exchange)
                  publisher-confirm-type: correlated
                  # 确认消息已发送到队列
                  publisher-returns: true
                  listener:
                    simple:
                      acknowledge-mode: manual # 开启消息消费手动确认
                      retry:
                        enabled: true
              

              2.2、消费确认

              @RabbitHandler
              public void process(UserInfo data, Message message, Channel channel){
                  log.info("收到directQueue队列信息:" + data);
                  long deliveryTag = message.getMessageProperties().getDeliveryTag();
                  try {
                      //成功消费确认
                      channel.basicAck(deliveryTag,true);
                      log.info("消费成功确认完毕。。。。。");
                  } catch (IOException e) {
                      log.error("确认消息时抛出异常 ", e);        
                      // 重新确认,成功确认消息
                      try {
                          Thread.sleep(50);
                          channel.basicAck(deliveryTag, true);
                      } catch (IOException | InterruptedException e1) {
                          log.error("确认消息时抛出异常 ", e);
                          // 可以考虑入库
                      }
                  }catch (Exception e){
                      log.error("业务处理失败", e);
                      try {
                          // 失败确认
                          channel.basicNack(deliveryTag, false, false);
                      } catch (IOException e1) {
                          log.error("消息失败确认失败", e1);
                      }
                  }
              }