RabbitMQ---延迟消息
作者:mmseoamin日期:2024-03-20

RabbitMQ---延迟消息

延迟消息:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间后才收到消息。

延迟任务:设置在一定时间之后才执行的任务。

RabbitMQ---延迟消息,第1张

 延迟消息有以下三种实现方案:

  1. 死信交换机
  2. 延迟消息插件

一、延迟队列

TTL

  • TTL 全称 Time To Live(存活时间/过期时间)。
  • 当消息到达存活时间后,还没有被消费,会被自动清除。
  • RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。

    RabbitMQ---延迟消息,第2张

    死信交换机

    RabbitMQ---延迟消息,第3张

    成为死信(dead letter)的条件:

    • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false(消费者拒接消费消息,并且不重回队列;)
    • 消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费
    • 队列消息堆积已满,最早的消息可能成为死信

      如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)。

      RabbitMQ---延迟消息,第4张

      1、声明延迟队列

      package com.itheima.consumer.config;
      import org.springframework.amqp.core.*;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      import javax.security.auth.login.CredentialNotFoundException;
      @Configuration
      public class DlxExchangeConfiguration {
          /**
           * 声明 TTL 队列
           * 1. 指定消息的 TTL
           * 2. 指定死信交换机
           * 3. 指定死信交换机的 RoutingKey
           */
          @Bean
          public Queue ttlQueue() {
              return QueueBuilder
                      .durable("ttl.queue") // 指定队列的名称
                      //.ttl(10000) // 指定 TTL 为 10 秒,这里可设置过期时间,但我这里测试在发送消息时设置过期时间
                      .deadLetterExchange("dlx.direct") // 指定死信交换机
                      .deadLetterRoutingKey("dlx") // 指定死信交换机的 RoutingKey
                      .build();
          }
          /**
           * 声明TTl交换机
           * @return
           */
          @Bean
          public DirectExchange directExchange(){
              return new DirectExchange("ttl.direct");
          }
          /**
           * 声明ttl交换机与队列的关联关系
           * @return
           */
          @Bean
          public Binding directBinding(){
              return BindingBuilder.bind(ttlQueue()).to(directExchange()).with("ttl");
          }
          /**
           * 声明死信交换机
           * @return
           */
          @Bean
          public DirectExchange dlxDirect(){
              return new DirectExchange("dlx.direct");
          }
          /**
           * 声明死信队列
           * @return
           */
          @Bean
          public Queue dlxQueue(){
              return new Queue("dlx.queue");
          }
          /**
           * 声明死信交换机与队列关联关系
           * @return
           */
          @Bean
          public Binding tlxBinding(){
              return BindingBuilder.bind(dlxQueue()).to(dlxDirect()).with("dlx");
          }
      }
      

      2、发送消息

      @Test
          void testSendTTLMessage(){
              rabbitTemplate.convertAndSend("ttl.direct", "ttl", "hello", new MessagePostProcessor() {
                  @Override
                  public Message postProcessMessage(Message message) throws AmqpException {
                      message.getMessageProperties().setExpiration("10000");
                      return message;
                  }
              });
              log.info("消息发送成功!");
          }

      3、死信队列消费消息

      @RabbitListener(queues = "dlx.queue")  //监听的队列:dlx.queue
      public void listenDlxQueue(String msg){
          log.info("dlx.queue的消息:【"+msg+"】");
      }

      4、结果比对

      4.1、发送时间,设置10s过期

      RabbitMQ---延迟消息,第5张

      4.2、死信队列消费消息时间 

      RabbitMQ---延迟消息,第6张

      二、延迟消息插件 

              RabbitMQ官方推出的插件,原生支持延迟消息的功能。其原理是设计了一种支持延迟消息功能的交换机,当消息投递到交换机后可以暂存一定时间,到期后再投递到队列。

      本地RabbitMQ官网下载rabbitmq_delayer_message_exchange插件地址:

      https://www.rabbitmq.com/community-plugins.html

      RabbitMQ---延迟消息,第7张

      代码实现:

      声明延迟交换机方式一:

       @Bean
          public DirectExchange delayExchange(){
              return ExchangeBuilder
                      .directExchange("delay.direct")
                      .delayed()  //设置delay的属性为true
                      .durable(true)
                      .build();
          }

       方式二:

       

          @RabbitListener(bindings = @QueueBinding(
                  value = @Queue(name = "delay.queue", durable = "true"),
                  exchange = @Exchange(value = "delay.direct",delayed = "true"), //delayed  这是延迟交换机开启
                  key = "delay"
          ))
          public void listenDelayMessage(String msg){
              log.info("接收到delay.queue的延迟消息:{}",msg);
          }

      测试发送消息

      @Test
          void testSendDelayMessage(){
              rabbitTemplate.convertAndSend("delay.direct", "delay", "hello", new MessagePostProcessor() {
                  @Override
                  public Message postProcessMessage(Message message) throws AmqpException {
                      message.getMessageProperties().setDelay(1000);
                      return message;
                  }
              });
              log.info("消息发送成功!");
          }

      案例

      电商项目设置30分钟后检测订单支付状态,并完成取消超时订单

      设置30分钟检查订单状态存在两个问题:

      1. 如果并发较高,30分钟可能堆积消息过多,对MQ压力过大
      2. 大多数订单再下单后很短时间内就会支付,但是却需要在MQ内等待30分钟,浪费资源。

      RabbitMQ---延迟消息,第8张

       解决措施:设置多个延迟消息交换机,如设置不同的等待时间:10s、10s、10s、15s、15s....,这些时间相加得到30分钟,不同延迟时间过滤掉大部分的消息,给MQ减压

      RabbitMQ---延迟消息,第9张

      首先先查询支付状态,判断是否支付,如果状态显示未支付,则获取下次延迟时间,判断是否有延迟时间,有则重发延迟消息,没有延迟消息则取消订单。如果订单显示已支付,则标记未已支付。

      RabbitMQ---延迟消息,第10张

      定义延迟时间集合及相关方法:

      package com.itheima.consumer.config;
      import lombok.Data;
      import java.util.Arrays;
      import java.util.List;
      @Data
      public class MultiDelayMessage{
          //消息体
          private T data;
          //记录延迟消息时间的集合
          private List delayMillis;
          public  MultiDelayMessage(T data, List delayMillis){
              this.data = data;
              this.delayMillis = delayMillis;
          }
          public static  MultiDelayMessage of(T data, Long ... delayMillis){
              return new MultiDelayMessage<>(data, Arrays.asList(delayMillis));
          }
          //获取并移除下一个延迟时间
          //Returns: 队列中的第一个延迟时间
          public Long removeNextDelay(){
              return delayMillis.remove(0);
          }
          //是否还有下一个延迟时间
          public boolean hasNextDelay(){
              return !delayMillis.isEmpty();
          }
      }
      

       定义交换机、队列名称以及key:

      public interface MqConstants {
          String DELAY_EXCHANGE = "trade.delay.topic";  //交换机名称
          String DELAY_ORDER_QUEUE = "trade.order.delay.queue"; //队列名称
          String DELAY_PRDER_ROUTING_KEY = "order.query";  //key
      }

      定义延迟消息体:

      import lombok.RequiredArgsConstructor;
      import org.springframework.amqp.AmqpException;
      import org.springframework.amqp.core.Message;
      import org.springframework.amqp.core.MessagePostProcessor;
      @RequiredArgsConstructor
      public class DelayMessageProcessor implements MessagePostProcessor {
          
          private final int delay;
          @Override
          public Message postProcessMessage(Message message) throws AmqpException {
              message.getMessageProperties().setDelay(delay);
              return null;
          }
      }

      实现发送延迟消息:

      /*
           * 生成订单后,发送延迟检查订单消息
           */
          public void testOrderStatic(){
             try{
                 MultiDelayMessage msg = MultiDelayMessage.of("这里是订单ID", 1000L, 1000L, 1000L, 1500L, 1500L); //这里的消息体是订单ID,后面是延迟消息时间集合
                 rabbitTemplate.convertAndSend(
                         MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_PRDER_ROUTING_KEY,msg,
                         new DelayMessageProcessor(msg.removeNextDelay().intValue())
                 );
             }catch (AmqpException e){
                 log.error("延迟消息发送失败!");
             }
          }

      消费延迟消息的大概思路:

       

      import com.itheima.constants.DelayMessageProcessor;
      import com.itheima.constants.MqConstants;
      import com.itheima.constants.MultiDelayMessage;
      import lombok.RequiredArgsConstructor;
      import org.springframework.amqp.core.ExchangeTypes;
      import org.springframework.amqp.rabbit.annotation.*;
      import org.springframework.amqp.rabbit.core.RabbitTemplate;
      @RequiredArgsConstructor
      public class OrderStatusCheckListener {
          private RabbitTemplate rabbitTemplate;
          @RabbitListener(bindings = @QueueBinding(
                  value = @Queue(value = MqConstants.DELAY_ORDER_QUEUE, durable = "true"),
                  exchange = @Exchange(value = MqConstants.DELAY_EXCHANGE, delayed = "true" ,type = ExchangeTypes.TOPIC),
                  key = MqConstants.DELAY_PRDER_ROUTING_KEY
          ))
          public void listenOderDelayMessage(MultiDelayMessage msg){
              //1、查询订单状态
              //2、判断是否已支付
                  //2.1订单不存在或者已经处理(订单显示已支付)----交易服务显示已支付,则表示已支付,直接return
              //(交易服务显示未支付的情况下)3、去支付服务查询真正的支付状态   ---- 这里是在交易服务查询显示未支付,但不一定是未支付,需要去支付服务查询确定一下
              //3.1、判断支付服务的订单支付状态,已支付则标记订单状态为已支付,直接return
              //4、判断是否存在延迟时间
              if (msg.hasNextDelay()){
                  //4.1、存在,则重发延迟消息
                  Long nestDelay = msg.removeNextDelay();
                  rabbitTemplate.convertAndSend(
                          MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_PRDER_ROUTING_KEY,
                          msg, new DelayMessageProcessor(nestDelay.intValue())
                  );
                  return;
              }
              //5、不存在,取消订单,修改订单状态为取消订单
              //6、恢复库存
          }
      }